From e9c57eb9e71b03080443122b1b2027472c65889b Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 28 May 2024 13:08:10 +0200 Subject: [PATCH] Fix Connection Check in Publish Writer Task (#165) --- Source/HiveMQtt/Client/HiveMQClientEvents.cs | 2 +- .../Client/HiveMQClientTrafficProcessor.cs | 6 +- Tests/.editorconfig | 9 +- .../HiveMQtt.Test/HiveMQClient/ClientTest.cs | 2 + Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs | 2 +- .../LastWillAndTestamentBuilderTest.cs | 2 - .../Operational/QueuedPublishesTest.cs | 119 ++++++++++++++++++ .../HiveMQtt.Test/HiveMQClient/PubSubTest.cs | 8 +- .../HiveMQtt.Test/HiveMQClient/PublishTest.cs | 12 +- .../HiveMQClient/SubscribeBuilderTest.cs | 21 +++- 10 files changed, 160 insertions(+), 23 deletions(-) create mode 100644 Tests/HiveMQtt.Test/HiveMQClient/Operational/QueuedPublishesTest.cs diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 8a13baf8..44f8d402 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -254,7 +254,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) { if (t.IsFaulted) { - Logger.Error("per-subscription MessageReceivedEventLauncher exception: " + t.Exception.Message); + Logger.Error($"per-subscription MessageReceivedEventLauncher faulted ({packet.Message.Topic}): " + t.Exception.Message); } }, TaskScheduler.Default); diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 04872f41..b4530796 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -118,10 +118,10 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationTo break; } - while (this.ConnectState == ConnectState.Disconnected) + while (this.ConnectState != ConnectState.Connected) { Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); - await Task.Delay(2000).ConfigureAwait(false); + await Task.Delay(1000).ConfigureAwait(false); continue; } @@ -188,6 +188,8 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => break; } + // We allow this task to run in Connecting, Connected, and Disconnecting states + // because it is the one that has to send the CONNECT and DISCONNECT packets. while (this.ConnectState == ConnectState.Disconnected) { Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); diff --git a/Tests/.editorconfig b/Tests/.editorconfig index c9104ee9..aaba5b68 100644 --- a/Tests/.editorconfig +++ b/Tests/.editorconfig @@ -39,4 +39,11 @@ dotnet_diagnostic.CA1062.severity = none # https://docs.microsoft.com/en-us/visualstudio/code-quality/ca1707 dotnet_diagnostic.CA1707.severity = none -dotnet_diagnostic.CS1591.severity = none \ No newline at end of file +dotnet_diagnostic.CS1591.severity = none + +# VSTHRD101: Avoid unsupported async delegates +dotnet_diagnostic.VSTHRD101.severity = silent + + +# VSTHRD101: Avoid unsupported async delegates +dotnet_diagnostic.VSTHRD101.severity = suggestion diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs index 09a20a46..8d1168a4 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ClientTest.cs @@ -109,6 +109,8 @@ public async Task ClientStateAsync() // Publish QoS 0 (At most once delivery) _ = await client.PublishAsync("tests/ClientTest", new string("♚ ♛ ♜ ♝ ♞ ♟ ♔ ♕ ♖ ♗ ♘ ♙")).ConfigureAwait(false); + client.OnMessageReceived += (sender, args) => { }; + var subResult = await client.SubscribeAsync( "tests/ClientTest", MQTT5.Types.QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs index 8d7b18b1..1501ed58 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs @@ -24,7 +24,7 @@ public async Task Last_Will_With_Properties_Async() // Set the event handler for the message received event listenerClient.OnMessageReceived += (sender, args) => { - messagesReceived++; + Interlocked.Increment(ref messagesReceived); Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS); Assert.Equal("last/will2", args.PublishMessage.Topic); Assert.Equal("last will message", args.PublishMessage.PayloadAsString); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs index f21e18a7..c637ee18 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs @@ -39,7 +39,6 @@ public async Task Last_Will_With_Properties_Async() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); Assert.True(listenerClient.IsConnected()); - var messagesReceived = 0; var taskLWTReceived = new TaskCompletionSource(); #pragma warning disable SA1010 // Opening square brackets should be spaced correctly byte[] correlationDataBytes = [1, 2, 3, 4, 5]; @@ -48,7 +47,6 @@ public async Task Last_Will_With_Properties_Async() // Set the event handler for the message received event listenerClient.OnMessageReceived += (sender, args) => { - messagesReceived++; Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS); Assert.Equal("last/will7", args.PublishMessage.Topic); Assert.Equal("last will message", args.PublishMessage.PayloadAsString); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/Operational/QueuedPublishesTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/Operational/QueuedPublishesTest.cs new file mode 100644 index 00000000..9f5000dd --- /dev/null +++ b/Tests/HiveMQtt.Test/HiveMQClient/Operational/QueuedPublishesTest.cs @@ -0,0 +1,119 @@ +namespace HiveMQtt.Test.HiveMQClient; + +using System.Text; +using HiveMQtt.Client; +using HiveMQtt.MQTT5.Types; +using Xunit; + +public class QueuedPublishesTest +{ + [Fact] + public async Task Queued_Messages_Chain_Async() + { + + var batchSize = 1000; + + var tasks = new[] + { + Task.Run(this.RelayClientAsync), + Task.Run(this.ReceiverClientAsync), + Task.Run(this.PublisherClientAsync), + }; + + var results = await Task.WhenAll(tasks).ConfigureAwait(false); + Assert.Equal(batchSize, results[0]); + Assert.Equal(batchSize, results[1]); + Assert.Equal(batchSize, results[2]); + } + + private async Task PublisherClientAsync() + { + var batchSize = 1000; + var firstTopic = "hmq-tests-qmc/q1"; + + /////////////////////////////////////////////////////////////// + // Publish 1000 messages with an incrementing payload + /////////////////////////////////////////////////////////////// + var publisherOptions = new HiveMQClientOptionsBuilder() + .WithClientId("hmq-tests-qmc/q1-publisher") + .WithCleanStart(false) + .WithSessionExpiryInterval(40000) + .Build(); + var publishClient = new HiveMQClient(publisherOptions); + await publishClient.ConnectAsync().ConfigureAwait(false); + + // Wait for 1 second to allow other tasks to subscribe + await Task.Delay(1000).ConfigureAwait(false); + + for (var i = 0; i < batchSize; i++) + { + // Make a JSON string payload with the current number + var payload = Encoding.UTF8.GetBytes($"{{\"number\":{i}}}"); + + // Publish the message to the topic "hmq-tests/q1" with exactly once delivery + await publishClient.PublishAsync(firstTopic, payload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + } + + return batchSize; + } + + private async Task RelayClientAsync() + { + var firstTopic = "hmq-tests-qmc/q1"; + var secondTopic = "hmq-tests-qmc/q2"; + + //////////////////////////////////////////////////////////////////////////// + // Subscribe to the first topic and relay the messages to a second topic + //////////////////////////////////////////////////////////////////////////// + var subscriberOptions = new HiveMQClientOptionsBuilder() + .WithClientId("hmq-tests-qmc/q1-q2-relay") + .WithCleanStart(false) + .WithSessionExpiryInterval(40000) + .Build(); + var subscribeClient = new HiveMQClient(subscriberOptions); + + var relayCount = 0; + subscribeClient.OnMessageReceived += async (sender, args) => + { + // Republish the Message to the second topic + var payload = args.PublishMessage.Payload; + var publishResult = await subscribeClient.PublishAsync(secondTopic, payload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + Assert.NotNull(publishResult.QoS2ReasonCode); + + // Atomically increment the relayCount + Interlocked.Increment(ref relayCount); + }; + + await subscribeClient.ConnectAsync().ConfigureAwait(false); + await subscribeClient.SubscribeAsync(firstTopic, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + + // Wait until all messages are relayed + await Task.Delay(5000).ConfigureAwait(false); + return relayCount; + } + + private async Task ReceiverClientAsync() + { + var secondTopic = "hmq-tests-qmc/q2"; + + //////////////////////////////////////////////////////////////////////////// + // Subscribe to the second topic and count the received messages + //////////////////////////////////////////////////////////////////////////// + var receiverOptions = new HiveMQClientOptionsBuilder() + .WithClientId("hmq-tests-qmc/q2-receiver") + .WithCleanStart(false) + .WithSessionExpiryInterval(40000) + .Build(); + var receiverClient = new HiveMQClient(receiverOptions); + + var receivedCount = 0; + receiverClient.OnMessageReceived += (sender, args) => Interlocked.Increment(ref receivedCount); + + await receiverClient.ConnectAsync().ConfigureAwait(false); + await receiverClient.SubscribeAsync(secondTopic, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + + // Wait for the receiver to receive all messages + await Task.Delay(5000).ConfigureAwait(false); + return receivedCount; + } +} diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PubSubTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PubSubTest.cs index 094e9046..d4adc329 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PubSubTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PubSubTest.cs @@ -69,12 +69,12 @@ public async Task QoS1PubSubAsync() // Set the event handler for the message received event client.OnMessageReceived += (sender, args) => { - messagesReceived++; Assert.Equal(QualityOfService.AtLeastOnceDelivery, args.PublishMessage.QoS); Assert.Equal(testTopic, args.PublishMessage.Topic); Assert.Equal(testPayload, args.PublishMessage.PayloadAsString); - if (messagesReceived >= 5) + Interlocked.Increment(ref messagesReceived); + if (messagesReceived == 10 && taskCompletionSource.Task.IsCompleted == false) { taskCompletionSource.SetResult(true); } @@ -83,7 +83,7 @@ public async Task QoS1PubSubAsync() Client.Results.PublishResult result; // Publish 10 messages - for (var i = 0; i < 5; i++) + for (var i = 0; i < 10; i++) { result = await client.PublishAsync(testTopic, testPayload, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); Assert.IsType(result); @@ -115,7 +115,7 @@ public async Task QoS2PubSubAsync() // Set the event handler for the message received event client.OnMessageReceived += (sender, args) => { - messagesReceived++; + Interlocked.Increment(ref messagesReceived); Assert.Equal(QualityOfService.ExactlyOnceDelivery, args.PublishMessage.QoS); Assert.Equal(testTopic, args.PublishMessage.Topic); Assert.Equal(testPayload, args.PublishMessage.PayloadAsString); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs index b0f34c96..6c11f63d 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs @@ -204,7 +204,7 @@ public async Task ThreeNodeQoS0ChainedPublishesAsync() #pragma warning disable VSTHRD100 // Avoid async void methods async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client2MessageCount++; + Interlocked.Increment(ref client2MessageCount); if (sender is HiveMQClient client) { var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtMostOnceDelivery).ConfigureAwait(true); @@ -223,7 +223,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even #pragma warning disable VSTHRD100 // Avoid async void methods async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client3MessageCount++; + Interlocked.Increment(ref client3MessageCount); Assert.NotNull(eventArgs.PublishMessage); Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); } @@ -295,7 +295,7 @@ public async Task ThreeNodeQoS1ChainedPublishesAsync() #pragma warning disable VSTHRD100 // Avoid async void methods async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client2MessageCount++; + Interlocked.Increment(ref client2MessageCount); if (sender is HiveMQClient client) { var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); @@ -315,7 +315,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even // client 3 will receive the final message void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client3MessageCount++; + Interlocked.Increment(ref client3MessageCount); Assert.NotNull(eventArgs.PublishMessage); Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); } @@ -386,7 +386,7 @@ public async Task ThreeNodeQoS2ChainedPublishesAsync() #pragma warning disable VSTHRD100 // Avoid async void methods async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client2MessageCount++; + Interlocked.Increment(ref client2MessageCount); var client = sender as HiveMQClient; #pragma warning disable CS8602 // Dereference of a possibly null reference. var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true); @@ -405,7 +405,7 @@ async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs even var client3MessageCount = 0; void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) { - client3MessageCount++; + Interlocked.Increment(ref client3MessageCount); Assert.NotNull(eventArgs.PublishMessage); Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs index 86914b5f..891533bb 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/SubscribeBuilderTest.cs @@ -176,26 +176,35 @@ public async Task PerSubHandlerWithSingleLevelWildcardAsync() var subscribeOptions = new SubscribeOptionsBuilder() .WithSubscription("tests/PerSubHandlerWithSingleLevelWildcard/+/msg", MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) => { - messageCount++; var pattern = @"^tests/PerSubHandlerWithSingleLevelWildcard/[0-2]/msg$"; var regex = new Regex(pattern); Assert.Matches(regex, args.PublishMessage.Topic); Assert.Equal("test", args.PublishMessage.PayloadAsString); + Interlocked.Increment(ref messageCount); if (messageCount == 3) { if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/0/msg") { - tcs1.SetResult(true); + if (!tcs1.Task.IsCompleted) + { + tcs1.SetResult(true); + } } else if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/1/msg") { - tcs2.SetResult(true); + if (!tcs2.Task.IsCompleted) + { + tcs2.SetResult(true); + } } else if (args.PublishMessage.Topic == "tests/PerSubHandlerWithSingleLevelWildcard/2/msg") { - tcs3.SetResult(true); + if (!tcs3.Task.IsCompleted) + { + tcs3.SetResult(true); + } } } }) @@ -245,14 +254,14 @@ public async Task PerSubHandlerWithMultiLevelWildcardAsync() MQTT5.Types.QualityOfService.AtLeastOnceDelivery, messageReceivedHandler: (sender, args) => { - messageCount++; var pattern = @"\Atests/PerSubHandlerWithMultiLevelWildcard/(/?|.+)\z"; var regex = new Regex(pattern); Assert.Matches(regex, args.PublishMessage.Topic); Assert.Equal("test", args.PublishMessage.PayloadAsString); - if (messageCount == 3) + Interlocked.Increment(ref messageCount); + if (messageCount == 3 && !tcs.Task.IsCompleted) { tcs.SetResult(true); }