From e390c57e3f6c9960251de1befc38fc0b740aef64 Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 21 Mar 2024 10:29:54 +0100 Subject: [PATCH] Client Stability & Test Suite Improvements (#134) --- .github/workflows/build.yml | 6 +- Benchmarks/ClientBenchmarkApp/README.md | 28 ++++ Source/HiveMQtt/Client/HiveMQClient.cs | 57 ++++---- Source/HiveMQtt/Client/HiveMQClientSocket.cs | 116 ++++++++++------ .../Client/HiveMQClientTrafficProcessor.cs | 99 +++++++------- Source/HiveMQtt/Client/HiveMQClientUtil.cs | 6 +- .../Client/Options/HiveMQClientOptions.cs | 5 + Source/HiveMQtt/HiveMQtt.csproj | 8 +- .../HiveMQtt.Test/HiveMQClient/ClientTest.cs | 126 ++++++++++++++++++ .../HiveMQClient/ConnectOptions.cs | 26 ---- .../HiveMQtt.Test/HiveMQClient/OptionsTest.cs | 6 +- Tests/HiveMQtt.Test/HiveMQtt.Test.csproj | 4 + Tests/HiveMQtt.Test/xunit.runner.json | 16 +++ 13 files changed, 352 insertions(+), 151 deletions(-) create mode 100644 Tests/HiveMQtt.Test/xunit.runner.json diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 3ff5884b..69adfd97 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -60,9 +60,9 @@ jobs: - name: "Dotnet Cake Build" run: dotnet cake --target=Build shell: pwsh - # - name: "Dotnet Cake Test" - # run: dotnet cake --target=Test - # shell: pwsh + - name: "Dotnet Cake Test" + run: dotnet cake --target=Test + shell: pwsh - name: "Dotnet Cake Pack" run: dotnet cake --target=Pack shell: pwsh diff --git a/Benchmarks/ClientBenchmarkApp/README.md b/Benchmarks/ClientBenchmarkApp/README.md index 99b5e89b..4e7791ec 100644 --- a/Benchmarks/ClientBenchmarkApp/README.md +++ b/Benchmarks/ClientBenchmarkApp/README.md @@ -3,3 +3,31 @@ The benchmarks are built with [BenchmarkDotNet](https://benchmarkdotnet.org) and can be run with: `dotnet run ClientBenchmarkApp.csproj -c Release` + +# Results - Mar 21, 2024 + +With release [v0.11.0](https://github.com/hivemq/hivemq-mqtt-client-dotnet/releases/tag/v0.11.0) there was a big performance improvement. All messaging performance was improved but particularly publishing a QoS level 2 message went from ~206ms down to ~1.6ms. + +## Previous Performance + +| Method | Mean | Error | StdDev | Median | +|------------------------------------------ |-------------:|------------:|------------:|---------------:| +| 'Publish a QoS 0 message' | 390.8 us | 1,842.5 us | 1,218.7 us | 5.646 us | +| 'Publish a QoS 1 message' | 103,722.8 us | 4,330.0 us | 2,864.1 us | 103,536.375 us | +| 'Publish a QoS 2 message' | 202,367.9 us | 26,562.9 us | 17,569.7 us | 206,959.834 us | + +## First Pass Refactor Performance + +| Method | Mean | Error | StdDev | Median | +|------------------------------------------ |-----------:|-----------:|-----------:|-------------:| +| 'Publish a QoS 0 message' | 401.9 us | 1,876.3 us | 1,241.0 us | 9.250 us | +| 'Publish a QoS 1 message' | 2,140.0 us | 3,568.2 us | 2,360.1 us | 1,324.251 us | +| 'Publish a QoS 2 message' | 4,217.2 us | 5,803.7 us | 3,838.8 us | 2,569.166 us | + +## Final Refactor Performance Results (for now 👻) + +| Method | Mean | Error | StdDev | Median | +|------------------------------------------ |------------:|----------:|------------:|-------------:| +| 'Publish a QoS 0 message' | 47.11 us | 139.47 us | 411.23 us | 4.875 us | +| 'Publish a QoS 1 message' | 1,210.71 us | 508.64 us | 1,499.75 us | 790.645 us | +| 'Publish a QoS 2 message' | 2,080.46 us | 591.38 us | 1,743.71 us | 1,653.083 us | diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index e875bce2..dc72de5d 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -37,10 +37,12 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient { private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger(); - private ConnectState connectState = ConnectState.Disconnected; + internal ConnectState ConnectState { get; set; } public HiveMQClient(HiveMQClientOptions? options = null) { + this.ConnectState = ConnectState.Disconnected; + options ??= new HiveMQClientOptions(); options.Validate(); @@ -66,12 +68,12 @@ public HiveMQClient(HiveMQClientOptions? options = null) public List Subscriptions { get; } = new(); /// - public bool IsConnected() => this.connectState == ConnectState.Connected; + public bool IsConnected() => this.ConnectState == ConnectState.Connected; /// public async Task ConnectAsync() { - this.connectState = ConnectState.Connecting; + this.ConnectState = ConnectState.Connecting; Logger.Info("Connecting to broker at {0}:{1}", this.Options.Host, this.Options.Port); @@ -89,7 +91,7 @@ public async Task ConnectAsync() // Construct the MQTT Connect packet and queue to send var connPacket = new ConnectPacket(this.Options); Logger.Trace($"Queuing packet for send: {connPacket}"); - this.sendQueue.Add(connPacket); + this.SendQueue.Add(connPacket); // FIXME: Cancellation token and better timeout value ConnAckPacket connAck; @@ -100,7 +102,7 @@ public async Task ConnectAsync() } catch (TimeoutException) { - this.connectState = ConnectState.Disconnected; + this.ConnectState = ConnectState.Disconnected; throw new HiveMQttClientException("Connect timeout. No response received in time."); } finally @@ -111,11 +113,11 @@ public async Task ConnectAsync() if (connAck.ReasonCode == ConnAckReasonCode.Success) { - this.connectState = ConnectState.Connected; + this.ConnectState = ConnectState.Connected; } else { - this.connectState = ConnectState.Disconnected; + this.ConnectState = ConnectState.Disconnected; } connectResult = new ConnectResult(connAck.ReasonCode, connAck.SessionPresent, connAck.Properties); @@ -133,7 +135,7 @@ public async Task ConnectAsync() /// public async Task DisconnectAsync(DisconnectOptions? options = null) { - if (this.connectState != ConnectState.Connected) + if (this.ConnectState != ConnectState.Connected) { Logger.Warn("DisconnectAsync: Client is not connected."); return false; @@ -152,7 +154,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) }; // Once this is set, no more incoming packets or outgoing will be accepted - this.connectState = ConnectState.Disconnecting; + this.ConnectState = ConnectState.Disconnecting; var taskCompletionSource = new TaskCompletionSource(); void TaskHandler(object? sender, OnDisconnectSentEventArgs args) => taskCompletionSource.SetResult(args.DisconnectPacket); @@ -160,7 +162,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.OnDisconnectSent += eventHandler; Logger.Trace($"Queuing packet for send: {disconnectPacket}"); - this.sendQueue.Add(disconnectPacket); + this.SendQueue.Add(disconnectPacket); try { @@ -176,26 +178,35 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.OnDisconnectSent -= eventHandler; } - // Close the socket + this.HandleDisconnection(); + + return true; + } + + /// + /// Close the socket and set the connect state to disconnected. + /// + private void HandleDisconnection() + { + Logger.Debug("HandleDisconnection: Connection lost. Handling Disconnection."); + this.CloseSocket(); // Fire the corresponding event - this.AfterDisconnectEventLauncher(true); + this.AfterDisconnectEventLauncher(false); - this.connectState = ConnectState.Disconnected; + this.ConnectState = ConnectState.Disconnected; // FIXME - if (this.sendQueue.Count > 0) + if (this.SendQueue.Count > 0) { - Logger.Warn("Disconnect: Send queue not empty. Packets pending but we are disconnecting."); + Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting (or were disconnected)."); } // We only clear the send queue on explicit disconnect - while (this.sendQueue.TryTake(out _)) + while (this.SendQueue.TryTake(out _)) { } - - return true; } /// @@ -210,7 +221,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) if (message.QoS == QualityOfService.AtMostOnceDelivery) { Logger.Trace($"Queuing packet for send: {publishPacket}"); - this.sendQueue.Add(publishPacket); + this.SendQueue.Add(publishPacket); return new PublishResult(publishPacket.Message); } else if (message.QoS == QualityOfService.AtLeastOnceDelivery) @@ -223,7 +234,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) // Construct the MQTT Connect packet and queue to send Logger.Trace($"Queuing packet for send: {publishPacket}"); - this.sendQueue.Add(publishPacket); + this.SendQueue.Add(publishPacket); var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); @@ -239,7 +250,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) publishPacket.OnPublishQoS2Complete += eventHandler; // Construct the MQTT Connect packet and queue to send - this.sendQueue.Add(publishPacket); + this.SendQueue.Add(publishPacket); // Wait on the QoS 2 handshake var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); @@ -322,7 +333,7 @@ public async Task SubscribeAsync(SubscribeOptions options) this.OnSubAckReceived += eventHandler; // Queue the constructed packet to be sent on the wire - this.sendQueue.Add(subscribePacket); + this.SendQueue.Add(subscribePacket); SubAckPacket subAck; SubscribeResult subscribeResult; @@ -426,7 +437,7 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp EventHandler eventHandler = TaskHandler; this.OnUnsubAckReceived += eventHandler; - this.sendQueue.Add(unsubscribePacket); + this.SendQueue.Add(unsubscribePacket); // FIXME: Cancellation token and better timeout value UnsubAckPacket unsubAck; diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs index f25be2cf..689dc7ee 100644 --- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs +++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs @@ -30,18 +30,23 @@ namespace HiveMQtt.Client; /// public partial class HiveMQClient : IDisposable, IHiveMQClient { - private Socket? socket; - private Stream? stream; - private PipeReader? reader; - private PipeWriter? writer; + internal Socket? Socket { get; set; } + + internal Stream? Stream { get; set; } + + internal PipeReader? Reader { get; set; } + + internal PipeWriter? Writer { get; set; } + private CancellationTokenSource cancellationTokenSource; -#pragma warning disable IDE0052 - private Task? connectionWriterTask; - private Task? connectionReaderTask; - private Task? receivedPacketsHandlerAsync; - private Task? connectionMonitorTask; -#pragma warning restore IDE0052 + internal Task? ConnectionWriterTask { get; set; } + + internal Task? ConnectionReaderTask { get; set; } + + internal Task? ReceivedPacketsHandlerTask { get; set; } + + internal Task? ConnectionMonitorTask { get; set; } /// /// SSLStream Callback. This is used to always allow invalid broker certificates. @@ -137,29 +142,29 @@ internal async Task ConnectSocketAsync() IPEndPoint ipEndPoint = new(ipAddress, this.Options.Port); - this.socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + this.Socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); try { - await this.socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + await this.Socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); } catch (SocketException socketException) { throw new HiveMQttClientException("Failed to connect to broker", socketException); } - var socketConnected = this.socket.Connected; - if (!socketConnected || this.socket == null) + var socketConnected = this.Socket.Connected; + if (!socketConnected || this.Socket == null) { throw new HiveMQttClientException("Failed to connect socket"); } // Setup the stream - this.stream = new NetworkStream(this.socket); + this.Stream = new NetworkStream(this.Socket); if (this.Options.UseTLS) { - var result = await this.CreateTLSConnectionAsync(this.stream).ConfigureAwait(false); + var result = await this.CreateTLSConnectionAsync(this.Stream).ConfigureAwait(false); if (!result) { throw new HiveMQttClientException("Failed to create TLS connection"); @@ -167,20 +172,20 @@ internal async Task ConnectSocketAsync() } // Setup the Pipeline - this.reader = PipeReader.Create(this.stream); - this.writer = PipeWriter.Create(this.stream); + this.Reader = PipeReader.Create(this.Stream); + this.Writer = PipeWriter.Create(this.Stream); // Reset the CancellationTokenSource in case this is a reconnect this.cancellationTokenSource.Dispose(); this.cancellationTokenSource = new CancellationTokenSource(); // Start the traffic processors - this.connectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token); - this.connectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token); - this.receivedPacketsHandlerAsync = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token); - this.connectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token); + this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token); + this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token); + this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token); + this.ConnectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token); - Logger.Trace($"Socket connected to {this.socket.RemoteEndPoint}"); + Logger.Trace($"Socket connected to {this.Socket.RemoteEndPoint}"); return socketConnected; } @@ -211,18 +216,18 @@ private async Task CreateTLSConnectionAsync(Stream stream) try { Logger.Trace("Authenticating TLS connection"); - this.stream = new SslStream(stream); - await ((SslStream)this.stream).AuthenticateAsClientAsync(tlsOptions).ConfigureAwait(false); - - Logger.Info($"Connected via TLS: {((SslStream)this.stream).IsEncrypted}"); - Logger.Debug($"Cipher Algorithm: {((SslStream)this.stream).CipherAlgorithm}"); - Logger.Debug($"Cipher Strength: {((SslStream)this.stream).CipherStrength}"); - Logger.Debug($"Hash Algorithm: {((SslStream)this.stream).HashAlgorithm}"); - Logger.Debug($"Hash Strength: {((SslStream)this.stream).HashStrength}"); - Logger.Debug($"Key Exchange Algorithm: {((SslStream)this.stream).KeyExchangeAlgorithm}"); - Logger.Debug($"Key Exchange Strength: {((SslStream)this.stream).KeyExchangeStrength}"); - - var remoteCertificate = ((SslStream)this.stream).RemoteCertificate; + this.Stream = new SslStream(stream); + await ((SslStream)this.Stream).AuthenticateAsClientAsync(tlsOptions).ConfigureAwait(false); + + Logger.Info($"Connected via TLS: {((SslStream)this.Stream).IsEncrypted}"); + Logger.Debug($"Cipher Algorithm: {((SslStream)this.Stream).CipherAlgorithm}"); + Logger.Debug($"Cipher Strength: {((SslStream)this.Stream).CipherStrength}"); + Logger.Debug($"Hash Algorithm: {((SslStream)this.Stream).HashAlgorithm}"); + Logger.Debug($"Hash Strength: {((SslStream)this.Stream).HashStrength}"); + Logger.Debug($"Key Exchange Algorithm: {((SslStream)this.Stream).KeyExchangeAlgorithm}"); + Logger.Debug($"Key Exchange Strength: {((SslStream)this.Stream).KeyExchangeStrength}"); + + var remoteCertificate = ((SslStream)this.Stream).RemoteCertificate; if (remoteCertificate != null) { Logger.Info($"Remote Certificate Subject: {remoteCertificate.Subject}"); @@ -230,7 +235,7 @@ private async Task CreateTLSConnectionAsync(Stream stream) Logger.Info($"Remote Certificate Serial Number: {remoteCertificate.GetSerialNumberString()}"); } - Logger.Info($"TLS Protocol: {((SslStream)this.stream).SslProtocol}"); + Logger.Info($"TLS Protocol: {((SslStream)this.Stream).SslProtocol}"); return true; } catch (Exception e) @@ -251,16 +256,43 @@ internal bool CloseSocket(bool? shutdownPipeline = true) // Cancel the background traffic processing tasks this.cancellationTokenSource.Cancel(); + // Reset the tasks + this.ConnectionWriterTask = null; + this.ConnectionReaderTask = null; + this.ReceivedPacketsHandlerTask = null; + this.ConnectionMonitorTask = null; + if (shutdownPipeline == true) { - // Shutdown the pipeline - this.reader = null; - this.writer = null; + if (this.Reader != null && this.Writer != null) + { + // Dispose of the PipeReader and PipeWriter + this.Reader.Complete(); + this.Writer.Complete(); + + // Shutdown the pipeline + this.Reader = null; + this.Writer = null; + } + } + + if (this.Stream != null) + { + // Dispose of the Stream + this.Stream.Close(); + this.Stream.Dispose(); + this.Stream = null; } - // Shutdown the socket - this.socket?.Shutdown(SocketShutdown.Both); - this.socket?.Close(); + // Check if the socket is initialized and open + if (this.Socket != null && this.Socket.Connected) + { + // Shutdown the socket + this.Socket.Shutdown(SocketShutdown.Both); + this.Socket.Close(); + this.Socket.Dispose(); + this.Socket = null; + } return true; } diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 44fb677a..2be66d5c 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -30,9 +30,9 @@ namespace HiveMQtt.Client; /// public partial class HiveMQClient : IDisposable, IHiveMQClient { - private readonly BlockingCollection sendQueue = new(); + internal BlockingCollection SendQueue { get; } = new(); - private readonly BlockingCollection receivedQueue = new(); + internal BlockingCollection ReceivedQueue { get; } = new(); // Transactional packets indexed by packet identifier private readonly ConcurrentDictionary> transactionQueue = new(); @@ -49,7 +49,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = async () => { var keepAlivePeriod = this.Options.KeepAlive / 2; - Logger.Trace($"{this.Options.ClientId}-(CM)- Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- Starting...{this.ConnectState}"); while (true) { @@ -60,13 +60,13 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = } // If connected and no recent traffic, send a ping - if (this.connectState == ConnectState.Connected) + if (this.ConnectState == ConnectState.Connected) { if (this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) { // Send PingReq Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq"); - this.sendQueue.Add(new PingReqPacket()); + this.SendQueue.Add(new PingReqPacket()); } } @@ -81,7 +81,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = } } - Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}"); return true; }, cancellationToken); @@ -93,26 +93,26 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => async () => { this.lastCommunicationTimer.Start(); - Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.ConnectState}"); while (true) { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.sendQueue.Count} packets remaining."); + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); break; } - while (this.connectState == ConnectState.Disconnected) + while (this.ConnectState == ConnectState.Disconnected) { Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); await Task.Delay(2000).ConfigureAwait(false); continue; } - Logger.Trace($"{this.Options.ClientId}-(W)- {this.sendQueue.Count} packets waiting to be sent."); + Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); - var packet = this.sendQueue.Take(); + var packet = this.SendQueue.Take(); FlushResult writeResult = default; switch (packet) @@ -189,7 +189,7 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => break; /* case AuthPacket authPacket: - /* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false); + /* writeResult = await this.Writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false); /* this.OnAuthSentEventLauncher(authPacket); /* break; */ @@ -199,11 +199,11 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => } // switch - // if (writeResult.IsCancelled) - // { - // Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); - // break; - // } + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + break; + } if (writeResult.IsCompleted) { @@ -214,22 +214,22 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => this.lastCommunicationTimer.Restart(); } // foreach - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); return true; }, cancellationToken); /// /// Asynchronous background task that handles the incoming traffic of packets. Received packets - /// are queued into this.receivedQueue for processing by ReceivedPacketsHandlerAsync. + /// are queued into this.ReceivedQueue for processing by ReceivedPacketsHandlerAsync. /// private Task ConnectionReaderAsync(CancellationToken cancellationToken) => Task.Run( async () => { - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.ConnectState}"); ReadResult readResult; - while (this.connectState is ConnectState.Connecting or ConnectState.Connected) + while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) { if (cancellationToken.IsCancellationRequested) { @@ -239,28 +239,28 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => readResult = await this.ReadAsync().ConfigureAwait(false); - // if (readResult.IsCancelled) - // { - // Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); - // break; - // } + if (readResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result."); + break; + } if (readResult.IsCompleted) { Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); - if (this.connectState == ConnectState.Connected) + if (this.ConnectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: was unexpected"); - this.connectState = ConnectState.Disconnected; + this.ConnectState = ConnectState.Disconnected; // Launch the AfterDisconnect event with a clean disconnect set to false. this.AfterDisconnectEventLauncher(false); this.cancellationTokenSource.Cancel(); - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); return false; } @@ -289,24 +289,24 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => // Not enough data in the buffer to decode a packet // Advance the reader to the end of the consumed data buffer = buffer.Slice(0, consumed); - this.reader?.AdvanceTo(buffer.Start, readResult.Buffer.End); + this.Reader?.AdvanceTo(buffer.Start, readResult.Buffer.End); Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader: PacketDecoder.TryDecode returned false. Waiting for more data..."); break; } // Advance the reader to indicate how much of the buffer has been consumed buffer = buffer.Slice(consumed); - this.reader?.AdvanceTo(buffer.Start); + this.Reader?.AdvanceTo(buffer.Start); // Add the packet to the received queue for processing later // by ReceivedPacketsHandlerAsync Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue."); - this.receivedQueue.Add(decodedPacket); + this.ReceivedQueue.Add(decodedPacket); } // while (buffer.Length > 0 - } // while (this.connectState is ConnectState.Connecting or ConnectState.Connected) + } // while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); return true; }, cancellationToken); @@ -318,19 +318,19 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Task.Run( () => { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Starting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- Starting...{this.ConnectState}"); while (true) { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.receivedQueue.Count} received packets remaining."); + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining."); break; } - Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.receivedQueue.Count} received packets currently waiting to be processed."); + Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); - var packet = this.receivedQueue.Take(); + var packet = this.ReceivedQueue.Take(); switch (packet) { case ConnAckPacket connAckPacket: @@ -340,6 +340,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok case DisconnectPacket disconnectPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}"); Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + this.HandleDisconnection(); this.OnDisconnectReceivedEventLauncher(disconnectPacket); break; case PingRespPacket pingRespPacket: @@ -376,7 +377,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok } // switch (packet) } // while - Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.connectState}"); + Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}"); return true; }, cancellationToken); @@ -394,7 +395,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) { // We've received a QoS 1 publish. Send a PubAck. var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); - this.sendQueue.Add(pubAckResponse); + this.SendQueue.Add(pubAckResponse); } else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) { @@ -408,7 +409,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; } - this.sendQueue.Add(pubRecResponse); + this.SendQueue.Add(pubRecResponse); } } @@ -467,13 +468,13 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) } // Send the PUBREL response - this.sendQueue.Add(pubRelResponsePacket); + this.SendQueue.Add(pubRelResponsePacket); } else { // Send a PUBREL with PacketIdentifierNotFound var pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound); - this.sendQueue.Add(pubRelResponsePacket); + this.SendQueue.Add(pubRelResponsePacket); } } @@ -509,7 +510,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) Logger.Warn($"QoS2: Couldn't remove PubRel --> PubComp QoS2 Chain for packet identifier {pubRelPacket.PacketIdentifier}."); } - this.sendQueue.Add(pubCompResponsePacket); + this.SendQueue.Add(pubCompResponsePacket); } else { @@ -518,7 +519,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) // Send a PUBCOMP with PacketIdentifierNotFound var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound); - this.sendQueue.Add(pubCompResponsePacket); + this.SendQueue.Add(pubCompResponsePacket); } } @@ -558,12 +559,12 @@ internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) /// Raised if the writer is null. internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { - if (this.writer is null) + if (this.Writer is null) { throw new HiveMQttClientException("Writer is null"); } - return this.writer.WriteAsync(source, cancellationToken); + return this.Writer.WriteAsync(source, cancellationToken); } /// @@ -574,12 +575,12 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella /// Raised if the reader is null. internal async ValueTask ReadAsync(CancellationToken cancellationToken = default) { - if (this.reader is null) + if (this.Reader is null) { throw new HiveMQttClientException("Reader is null"); } - var readResult = await this.reader.ReadAsync(cancellationToken).ConfigureAwait(false); + var readResult = await this.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); Logger.Trace($"ReadAsync: Read Buffer Length {readResult.Buffer.Length}"); return readResult; } diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index de6ac950..8c24b0ba 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -183,15 +183,15 @@ protected virtual void Dispose(bool disposing) // and unmanaged resources. if (disposing) { - if (this.connectState == Internal.ConnectState.Connected) + if (this.ConnectState == Internal.ConnectState.Connected) { Logger.Trace("HiveMQClient Dispose: Disconnecting connected client."); _ = Task.Run(async () => await this.DisconnectAsync().ConfigureAwait(false)); } // Dispose managed resources. - this.sendQueue.CompleteAdding(); - this.receivedQueue.CompleteAdding(); + this.SendQueue.CompleteAdding(); + this.ReceivedQueue.CompleteAdding(); this.cancellationTokenSource.Cancel(); this.cancellationTokenSource.Dispose(); diff --git a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs index 23283087..58545804 100644 --- a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs +++ b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs @@ -224,6 +224,11 @@ public void Validate() if (this.ClientMaximumPacketSize != null) { this.ClientMaximumPacketSize = RangeValidateFourByteInteger((long)this.ClientMaximumPacketSize); + + if (this.ClientMaximumPacketSize == 0) + { + throw new HiveMQttClientException("Client Maximum Packet Size must be greater than 0."); + } } if (this.ClientTopicAliasMaximum != null) diff --git a/Source/HiveMQtt/HiveMQtt.csproj b/Source/HiveMQtt/HiveMQtt.csproj index 3cecf01e..76e1e7a2 100644 --- a/Source/HiveMQtt/HiveMQtt.csproj +++ b/Source/HiveMQtt/HiveMQtt.csproj @@ -39,11 +39,13 @@ + - + + <_Parameter1>HiveMQtt.Test + - - + PreserveNewest diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs index 64df0862..82576539 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs @@ -1,6 +1,7 @@ namespace HiveMQtt.Test.HiveMQClient; using HiveMQtt.Client; +using HiveMQtt.Client.Internal; using Xunit; public class ClientTest @@ -38,4 +39,129 @@ public async Task Client_Has_Default_Connect_Async() Assert.True(result); } + + [Fact] + public async Task ClientStateAsync() + { + var client = new HiveMQClient(); + + // Validate private internals of the HiveMQtt client + + // Socket Stuff + Assert.Null(client.Socket); + Assert.Null(client.Stream); + Assert.Null(client.Reader); + Assert.Null(client.Writer); + + // Task Stuff + Assert.Null(client.ConnectionWriterTask); + Assert.Null(client.ConnectionReaderTask); + Assert.Null(client.ReceivedPacketsHandlerTask); + Assert.Null(client.ConnectionMonitorTask); + + // Queues + Assert.NotNull(client.SendQueue); + Assert.NotNull(client.ReceivedQueue); + + // State + Assert.Equal(ConnectState.Disconnected, client.ConnectState); + + // ************************************* + // Connect and validate internals again + // ************************************* + var connectResult = await client.ConnectAsync().ConfigureAwait(false); + Assert.Equal(MQTT5.ReasonCodes.ConnAckReasonCode.Success, connectResult.ReasonCode); + + // Wait for connack + await Task.Delay(1000).ConfigureAwait(false); + + // Socket Stuff + Assert.NotNull(client.Socket); + Assert.NotNull(client.Stream); + Assert.NotNull(client.Reader); + Assert.NotNull(client.Writer); + + // Task Stuff + Assert.NotNull(client.ConnectionWriterTask); + Assert.NotNull(client.ConnectionReaderTask); + Assert.NotNull(client.ReceivedPacketsHandlerTask); + Assert.NotNull(client.ConnectionMonitorTask); + + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionWriterTask.Status); + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionReaderTask.Status); + Assert.Equal(TaskStatus.Running, client.ReceivedPacketsHandlerTask.Status); + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionMonitorTask.Status); + + // Queues + Assert.NotNull(client.SendQueue); + Assert.NotNull(client.ReceivedQueue); + + // State + Assert.Equal(ConnectState.Connected, client.ConnectState); + + // ************************************* + // Do some stuff and validate internals again + // ************************************* + + // Publish QoS 0 (At most once delivery) + _ = await client.PublishAsync("tests/ClientTest", new string("♚ ♛ ♜ ♝ ♞ ♟ ♔ ♕ ♖ ♗ ♘ ♙")).ConfigureAwait(false); + + var subResult = await client.SubscribeAsync( + "tests/ClientTest", + MQTT5.Types.QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + + // Publish QoS 1 (At least once delivery) + var pubResult = await client.PublishAsync( + "tests/ClientTest", + new string("♚ ♛ ♜ ♝ ♞ ♟ ♔ ♕ ♖ ♗ ♘ ♙"), + MQTT5.Types.QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + + // Socket Stuff + Assert.NotNull(client.Socket); + Assert.NotNull(client.Stream); + Assert.NotNull(client.Reader); + Assert.NotNull(client.Writer); + + // Task Stuff + Assert.NotNull(client.ConnectionWriterTask); + Assert.NotNull(client.ConnectionReaderTask); + Assert.NotNull(client.ReceivedPacketsHandlerTask); + Assert.NotNull(client.ConnectionMonitorTask); + + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionWriterTask.Status); + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionReaderTask.Status); + Assert.Equal(TaskStatus.Running, client.ReceivedPacketsHandlerTask.Status); + Assert.Equal(TaskStatus.WaitingForActivation, client.ConnectionMonitorTask.Status); + + // Queues + Assert.NotNull(client.SendQueue); + Assert.NotNull(client.ReceivedQueue); + + // ************************************* + // Disconnect and validate internals again + // ************************************* + var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); + + // Wait for disconnect to take affect + await Task.Delay(1000).ConfigureAwait(false); + + // Socket Stuff + Assert.Null(client.Socket); + Assert.Null(client.Stream); + Assert.Null(client.Reader); + Assert.Null(client.Writer); + + // Task Stuff + Assert.Null(client.ConnectionWriterTask); + Assert.Null(client.ConnectionReaderTask); + Assert.Null(client.ReceivedPacketsHandlerTask); + Assert.Null(client.ConnectionMonitorTask); + + // Queues + Assert.NotNull(client.SendQueue); + Assert.NotNull(client.ReceivedQueue); + + // State + Assert.Equal(ConnectState.Disconnected, client.ConnectState); + } } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ConnectOptions.cs b/Tests/HiveMQtt.Test/HiveMQClient/ConnectOptions.cs index 1e5d8835..25ccb6dd 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ConnectOptions.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ConnectOptions.cs @@ -147,30 +147,4 @@ public async Task Request_Response_Information_Async() var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); } - - [Fact] - public async Task Request_Problem_Information_Async() - { - var options = new HiveMQClientOptions - { - RequestProblemInformation = true, - ClientMaximumPacketSize = 0, - }; - - var client = new HiveMQClient(options); - Assert.Equal(true, client.Options.RequestProblemInformation); - - var connectResult = await client.ConnectAsync().ConfigureAwait(false); - Assert.True(connectResult.ReasonCode == ConnAckReasonCode.ProtocolError); - - // FIXME: Broker doesn't return ReasonString for successful connections - // Make a better test with a failure scenario - Assert.NotNull(connectResult.ReasonString); - Assert.Equal("Sent CONNECT with packet size set to '0'. This is a protocol violation.", connectResult.ReasonString); - - Assert.False(client.IsConnected()); - } - - // FIXME: Add Authentication Tests - // AuthenticationMethod/Data } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs index b70d9b3b..062c98bf 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs @@ -1,5 +1,6 @@ namespace HiveMQtt.Test.HiveMQClient; +using HiveMQtt.Client.Exceptions; using HiveMQtt.Client.Options; using Xunit; @@ -61,8 +62,9 @@ public void WithBadClientMaximumPacketSize() { ClientMaximumPacketSize = -300, }; - options.Validate(); - Assert.Equal(0, options.ClientMaximumPacketSize); + + var exception = Assert.Throws(() => options.Validate()); + Assert.Contains("Client Maximum Packet Size must be greater than 0.", exception.Message); options.ClientMaximumPacketSize = long.MaxValue; options.Validate(); diff --git a/Tests/HiveMQtt.Test/HiveMQtt.Test.csproj b/Tests/HiveMQtt.Test/HiveMQtt.Test.csproj index 9601b084..8639ba11 100644 --- a/Tests/HiveMQtt.Test/HiveMQtt.Test.csproj +++ b/Tests/HiveMQtt.Test/HiveMQtt.Test.csproj @@ -13,6 +13,10 @@ + + + + diff --git a/Tests/HiveMQtt.Test/xunit.runner.json b/Tests/HiveMQtt.Test/xunit.runner.json new file mode 100644 index 00000000..4661eb77 --- /dev/null +++ b/Tests/HiveMQtt.Test/xunit.runner.json @@ -0,0 +1,16 @@ +{ + "parallelizeAssembly": false, + "parallelizeTestCollections": false, + "execution": { + "maxParallelThreads": 1, + "maxParallelAssemblies": 1, + "maxParallelTestCollections": 1, + "parallelizeTestCollections": false, + "disableParallelization": true, + "stopOnFail": false + }, + "timing": { + "before": 1000 + } +} +