From d043390cd1534a4659462c282c3875e70c41720d Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Thu, 16 May 2024 16:38:14 +0200 Subject: [PATCH] Event Handling & QoS 2 Improvements (#157) --- .../docs/how-to/configure-logging.md | 1 + .../Exceptions/HiveMQttClientException.cs | 7 - Source/HiveMQtt/Client/HiveMQClient.cs | 53 ++- Source/HiveMQtt/Client/HiveMQClientEvents.cs | 243 ++++++++-- Source/HiveMQtt/Client/HiveMQClientSocket.cs | 4 + .../Client/HiveMQClientTrafficProcessor.cs | 437 +++++++++++------- Source/HiveMQtt/Client/HiveMQClientUtil.cs | 3 - .../HiveMQtt.Test/HiveMQClient/ConnectTest.cs | 2 + Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs | 6 +- .../LastWillAndTestamentBuilderTest.cs | 5 +- .../HiveMQtt.Test/HiveMQClient/PublishTest.cs | 276 ++++++++++- .../HiveMQClient/UnsubscribeBuilderTest.cs | 1 - 12 files changed, 803 insertions(+), 235 deletions(-) diff --git a/Documentation/docs/how-to/configure-logging.md b/Documentation/docs/how-to/configure-logging.md index 2f207bcb..03660abc 100644 --- a/Documentation/docs/how-to/configure-logging.md +++ b/Documentation/docs/how-to/configure-logging.md @@ -25,6 +25,7 @@ Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package d ```log 2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend: 2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter +2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(PW)- == ConnectionPublishWriter 2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader 2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler 2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883 diff --git a/Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs b/Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs index 76f1d20f..86cfb5e0 100644 --- a/Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs +++ b/Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs @@ -33,11 +33,4 @@ public HiveMQttClientException(string message, Exception inner) : base(message, inner) { } - - protected HiveMQttClientException( - System.Runtime.Serialization.SerializationInfo info, - System.Runtime.Serialization.StreamingContext context) - : base(info, context) - { - } } diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index b1228938..ca9c5a2a 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -50,6 +50,7 @@ public HiveMQClient(HiveMQClientOptions? options = null) Logger.Trace("Trace Level Logging Legend:"); Logger.Trace(" -(W)- == ConnectionWriter"); + Logger.Trace(" -(PW)- == ConnectionPublishWriter"); Logger.Trace(" -(R)- == ConnectionReader"); Logger.Trace(" -(CM)- == ConnectionMonitor"); Logger.Trace(" -(RPH)- == ReceivedPacketsHandler"); @@ -91,7 +92,7 @@ public async Task ConnectAsync() // Construct the MQTT Connect packet and queue to send var connPacket = new ConnectPacket(this.Options); Logger.Trace($"Queuing packet for send: {connPacket}"); - this.SendQueue.Add(connPacket); + this.SendQueue.Enqueue(connPacket); ConnAckPacket connAck; ConnectResult connectResult; @@ -162,11 +163,11 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.OnDisconnectSent += eventHandler; Logger.Trace($"Queuing packet for send: {disconnectPacket}"); - this.SendQueue.Add(disconnectPacket); + this.SendQueue.Enqueue(disconnectPacket); try { - disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); } catch (TimeoutException) { @@ -200,7 +201,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) if (message.QoS == QualityOfService.AtMostOnceDelivery) { Logger.Trace($"Queuing packet for send: {publishPacket}"); - this.SendQueue.Add(publishPacket); + this.OutgoingPublishQueue.Enqueue(publishPacket); return new PublishResult(publishPacket.Message); } else if (message.QoS == QualityOfService.AtLeastOnceDelivery) @@ -211,11 +212,10 @@ public async Task PublishAsync(MQTT5PublishMessage message) EventHandler eventHandler = TaskHandler; publishPacket.OnPublishQoS1Complete += eventHandler; - // Construct the MQTT Connect packet and queue to send Logger.Trace($"Queuing packet for send: {publishPacket}"); - this.SendQueue.Add(publishPacket); + this.OutgoingPublishQueue.Enqueue(publishPacket); - var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); publishPacket.OnPublishQoS1Complete -= eventHandler; return new PublishResult(publishPacket.Message, pubAckPacket); @@ -229,20 +229,31 @@ public async Task PublishAsync(MQTT5PublishMessage message) EventHandler eventHandler = TaskHandler; publishPacket.OnPublishQoS2Complete += eventHandler; - // Construct the MQTT Connect packet and queue to send - this.SendQueue.Add(publishPacket); + Logger.Trace($"Queuing packet for send: {publishPacket}"); + this.OutgoingPublishQueue.Enqueue(publishPacket); List packetList; try { // Wait on the QoS 2 handshake - packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + // FIXME: Timeout value + packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); } catch (TimeoutException) { Logger.Error("PublishAsync: QoS 2 timeout. No response received in time."); - publishResult = new PublishResult(publishPacket.Message); - publishResult.QoS2ReasonCode = null; + + // Remove the transaction chain + if (this.transactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain)) + { + Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}."); + } + + // Prepare PublishResult + publishResult = new PublishResult(publishPacket.Message) + { + QoS2ReasonCode = null, + }; publishPacket.OnPublishQoS2Complete -= eventHandler; return publishResult; } @@ -329,13 +340,13 @@ public async Task SubscribeAsync(SubscribeOptions options) this.OnSubAckReceived += eventHandler; // Queue the constructed packet to be sent on the wire - this.SendQueue.Add(subscribePacket); + this.SendQueue.Enqueue(subscribePacket); SubAckPacket subAck; SubscribeResult subscribeResult; try { - subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); } catch (TimeoutException) { @@ -438,14 +449,14 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp EventHandler eventHandler = TaskHandler; this.OnUnsubAckReceived += eventHandler; - this.SendQueue.Add(unsubscribePacket); + this.SendQueue.Enqueue(unsubscribePacket); // FIXME: Cancellation token and better timeout value UnsubAckPacket unsubAck; UnsubscribeResult unsubscribeResult; try { - unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); // FIXME: Validate that the packet identifier matches } @@ -497,15 +508,19 @@ private async Task HandleDisconnectionAsync(bool clean = true) if (clean) { - if (this.SendQueue.Count > 0) + if (!this.SendQueue.IsEmpty) { Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting."); } - // We only clear the send queue on explicit disconnect - while (this.SendQueue.TryTake(out _)) + if (!this.OutgoingPublishQueue.IsEmpty) { + Logger.Warn($"HandleDisconnection: Outgoing publish queue not empty. {this.OutgoingPublishQueue.Count} packets pending but we are disconnecting."); } + + // We only clear the queues on explicit disconnect + this.SendQueue.Clear(); + this.OutgoingPublishQueue.Clear(); } // Fire the corresponding after event diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 02332c47..e4ece8de 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -38,7 +38,13 @@ protected virtual void BeforeConnectEventLauncher(HiveMQClientOptions options) { var eventArgs = new BeforeConnectEventArgs(options); Logger.Trace("BeforeConnectEventLauncher"); - this.BeforeConnect?.Invoke(this, eventArgs); + _ = Task.Run(() => this.BeforeConnect?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("BeforeConnectEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -50,7 +56,13 @@ protected virtual void AfterConnectEventLauncher(ConnectResult results) { var eventArgs = new AfterConnectEventArgs(results); Logger.Trace("AfterConnectEventLauncher"); - this.AfterConnect?.Invoke(this, eventArgs); + _ = Task.Run(() => this.AfterConnect?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("AfterConnectEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -62,7 +74,13 @@ protected virtual void BeforeDisconnectEventLauncher() { var eventArgs = new BeforeDisconnectEventArgs(); Logger.Trace("BeforeDisconnectEventLauncher"); - this.BeforeDisconnect?.Invoke(this, eventArgs); + _ = Task.Run(() => this.BeforeDisconnect?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("BeforeDisconnectEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -74,7 +92,13 @@ protected virtual void AfterDisconnectEventLauncher(bool clean = false) { var eventArgs = new AfterDisconnectEventArgs(clean); Logger.Trace("AfterDisconnectEventLauncher"); - this.AfterDisconnect?.Invoke(this, eventArgs); + _ = Task.Run(() => this.AfterDisconnect?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("AfterDisconnectEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -86,7 +110,13 @@ protected virtual void BeforeSubscribeEventLauncher(SubscribeOptions options) { var eventArgs = new BeforeSubscribeEventArgs(options); Logger.Trace("BeforeSubscribeEventLauncher"); - this.BeforeSubscribe?.Invoke(this, eventArgs); + _ = Task.Run(() => this.BeforeSubscribe?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("BeforeSubscribeEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -98,7 +128,13 @@ protected virtual void AfterSubscribeEventLauncher(SubscribeResult results) { var eventArgs = new AfterSubscribeEventArgs(results); Logger.Trace("AfterSubscribeEventLauncher"); - this.AfterSubscribe?.Invoke(this, eventArgs); + _ = Task.Run(() => this.AfterSubscribe?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("AfterSubscribeEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -110,7 +146,13 @@ protected virtual void BeforeUnsubscribeEventLauncher(List subscri { var eventArgs = new BeforeUnsubscribeEventArgs(subscriptions); Logger.Trace("BeforeUnsubscribeEventLauncher"); - this.BeforeUnsubscribe?.Invoke(this, eventArgs); + _ = Task.Run(() => this.BeforeUnsubscribe?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("BeforeUnsubscribeEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -122,7 +164,13 @@ protected virtual void AfterUnsubscribeEventLauncher(UnsubscribeResult results) { var eventArgs = new AfterUnsubscribeEventArgs(results); Logger.Trace("AfterUnsubscribeEventLauncher"); - this.AfterUnsubscribe?.Invoke(this, eventArgs); + _ = Task.Run(() => this.AfterUnsubscribe?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("AfterUnsubscribeEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -136,14 +184,29 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) Logger.Trace("OnMessageReceivedEventLauncher"); // Global Event Handler - this.OnMessageReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnMessageReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnMessageReceivedEventLauncher exception: " + t.Exception.Message); + } + }); // Per Subscription Event Handler foreach (var subscription in this.Subscriptions) { if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic)) { - subscription.MessageReceivedHandler?.Invoke(this, eventArgs); + if (subscription.MessageReceivedHandler != null) + { + _ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("per-subscription OnMessageReceivedEventLauncher exception: " + t.Exception.Message); + } + }); + } } } } @@ -161,7 +224,13 @@ protected virtual void OnConnectSentEventLauncher(ConnectPacket packet) { var eventArgs = new OnConnectSentEventArgs(packet); Logger.Trace("OnConnectSentEventLauncher"); - this.OnConnectSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnConnectSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnConnectSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -173,7 +242,13 @@ protected virtual void OnConnAckReceivedEventLauncher(ConnAckPacket packet) { var eventArgs = new OnConnAckReceivedEventArgs(packet); Logger.Trace("OnConnAckReceivedEventLauncher"); - this.OnConnAckReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnConnAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnConnAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -185,7 +260,13 @@ protected virtual void OnDisconnectSentEventLauncher(DisconnectPacket packet) { var eventArgs = new OnDisconnectSentEventArgs(packet); Logger.Trace("OnDisconnectSentEventLauncher"); - this.OnDisconnectSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnDisconnectSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnDisconnectSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -197,7 +278,13 @@ protected virtual void OnDisconnectReceivedEventLauncher(DisconnectPacket packet { var eventArgs = new OnDisconnectReceivedEventArgs(packet); Logger.Trace("OnDisconnectReceivedEventLauncher: ReasonCode: " + packet.DisconnectReasonCode + " ReasonString: " + packet.Properties.ReasonString); - this.OnDisconnectReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnDisconnectReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnDisconnectReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -209,7 +296,13 @@ protected virtual void OnPingReqSentEventLauncher(PingReqPacket packet) { var eventArgs = new OnPingReqSentEventArgs(packet); Logger.Trace("OnPingReqSentEventLauncher"); - this.OnPingReqSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPingReqSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPingReqSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -221,7 +314,13 @@ protected virtual void OnPingRespReceivedEventLauncher(PingRespPacket packet) { var eventArgs = new OnPingRespReceivedEventArgs(packet); Logger.Trace("OnPingRespReceivedEventLauncher"); - this.OnPingRespReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPingRespReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPingRespReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -233,7 +332,13 @@ protected virtual void OnSubscribeSentEventLauncher(SubscribePacket packet) { var eventArgs = new OnSubscribeSentEventArgs(packet); Logger.Trace("OnSubscribeSentEventLauncher"); - this.OnSubscribeSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnSubscribeSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnSubscribeSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -245,7 +350,13 @@ protected virtual void OnSubAckReceivedEventLauncher(SubAckPacket packet) { var eventArgs = new OnSubAckReceivedEventArgs(packet); Logger.Trace("OnSubAckReceivedEventLauncher"); - this.OnSubAckReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnSubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnSubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -257,7 +368,13 @@ protected virtual void OnUnsubscribeSentEventLauncher(UnsubscribePacket packet) { var eventArgs = new OnUnsubscribeSentEventArgs(packet); Logger.Trace("OnUnsubscribeSentEventLauncher"); - this.OnUnsubscribeSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnUnsubscribeSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnUnsubscribeSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -269,7 +386,13 @@ protected virtual void OnUnsubAckReceivedEventLauncher(UnsubAckPacket packet) { var eventArgs = new OnUnsubAckReceivedEventArgs(packet); Logger.Trace("OnUnsubAckReceivedEventLauncher"); - this.OnUnsubAckReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnUnsubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnUnsubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -281,7 +404,13 @@ protected virtual void OnPublishReceivedEventLauncher(PublishPacket packet) { var eventArgs = new OnPublishReceivedEventArgs(packet); Logger.Trace("OnPublishReceivedEventLauncher"); - this.OnPublishReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPublishReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPublishReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -293,7 +422,13 @@ protected virtual void OnPublishSentEventLauncher(PublishPacket packet) { var eventArgs = new OnPublishSentEventArgs(packet); Logger.Trace("OnPublishSentEventLauncher"); - this.OnPublishSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPublishSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPublishSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -305,7 +440,13 @@ protected virtual void OnPubAckReceivedEventLauncher(PubAckPacket packet) { var eventArgs = new OnPubAckReceivedEventArgs(packet); Logger.Trace("OnPubAckReceivedEventLauncher"); - this.OnPubAckReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -317,7 +458,13 @@ protected virtual void OnPubAckSentEventLauncher(PubAckPacket packet) { var eventArgs = new OnPubAckSentEventArgs(packet); Logger.Trace("OnPubAckSentEventLauncher"); - this.OnPubAckSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubAckSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubAckSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -329,7 +476,13 @@ protected virtual void OnPubRecReceivedEventLauncher(PubRecPacket packet) { var eventArgs = new OnPubRecReceivedEventArgs(packet); Logger.Trace("OnPubRecReceivedEventLauncher"); - this.OnPubRecReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubRecReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubRecReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -341,7 +494,13 @@ protected virtual void OnPubRecSentEventLauncher(PubRecPacket packet) { var eventArgs = new OnPubRecSentEventArgs(packet); Logger.Trace("OnPubRecSentEventLauncher"); - this.OnPubRecSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubRecSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubRecSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -353,7 +512,13 @@ protected virtual void OnPubRelReceivedEventLauncher(PubRelPacket packet) { var eventArgs = new OnPubRelReceivedEventArgs(packet); Logger.Trace("OnPubRelReceivedEventLauncher"); - this.OnPubRelReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubRelReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubRelReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -365,7 +530,13 @@ protected virtual void OnPubRelSentEventLauncher(PubRelPacket packet) { var eventArgs = new OnPubRelSentEventArgs(packet); Logger.Trace("OnPubRelSentEventLauncher"); - this.OnPubRelSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubRelSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("OnPubRelSentEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -377,7 +548,13 @@ protected virtual void OnPubCompReceivedEventLauncher(PubCompPacket packet) { var eventArgs = new OnPubCompReceivedEventArgs(packet); Logger.Trace("PubCompReceivedEventLauncher"); - this.OnPubCompReceived?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubCompReceived?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("PubCompReceivedEventLauncher exception: " + t.Exception.Message); + } + }); } /// @@ -389,6 +566,12 @@ protected virtual void OnPubCompSentEventLauncher(PubCompPacket packet) { var eventArgs = new OnPubCompSentEventArgs(packet); Logger.Trace("PubCompSentEventLauncher"); - this.OnPubCompSent?.Invoke(this, eventArgs); + _ = Task.Run(() => this.OnPubCompSent?.Invoke(this, eventArgs)).ContinueWith(t => + { + if (t.IsFaulted) + { + Logger.Error("PubCompSentEventLauncher exception: " + t.Exception.Message); + } + }); } } diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs index 689dc7ee..3ad80e44 100644 --- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs +++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs @@ -40,6 +40,8 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient private CancellationTokenSource cancellationTokenSource; + internal Task? ConnectionPublishWriterTask { get; set; } + internal Task? ConnectionWriterTask { get; set; } internal Task? ConnectionReaderTask { get; set; } @@ -180,6 +182,7 @@ internal async Task ConnectSocketAsync() this.cancellationTokenSource = new CancellationTokenSource(); // Start the traffic processors + this.ConnectionPublishWriterTask = this.ConnectionPublishWriterAsync(this.cancellationTokenSource.Token); this.ConnectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token); this.ConnectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token); this.ReceivedPacketsHandlerTask = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token); @@ -257,6 +260,7 @@ internal bool CloseSocket(bool? shutdownPipeline = true) this.cancellationTokenSource.Cancel(); // Reset the tasks + this.ConnectionPublishWriterTask = null; this.ConnectionWriterTask = null; this.ConnectionReaderTask = null; this.ReceivedPacketsHandlerTask = null; diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index c0aa1e66..520c11ff 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -26,16 +26,21 @@ namespace HiveMQtt.Client; using HiveMQtt.MQTT5; using HiveMQtt.MQTT5.Packets; using HiveMQtt.MQTT5.ReasonCodes; +using HiveMQtt.MQTT5.Types; /// public partial class HiveMQClient : IDisposable, IHiveMQClient { - internal BlockingCollection SendQueue { get; } = new(); + internal MQTT5Properties ConnectionProperties { get; set; } = new(); - internal BlockingCollection ReceivedQueue { get; } = new(); + internal ConcurrentQueue OutgoingPublishQueue { get; } = new(); + + internal ConcurrentQueue SendQueue { get; } = new(); + + internal ConcurrentQueue ReceivedQueue { get; } = new(); // Transactional packets indexed by packet identifier - private readonly ConcurrentDictionary> transactionQueue = new(); + internal readonly ConcurrentDictionary> transactionQueue = new(); private readonly Stopwatch lastCommunicationTimer = new(); @@ -66,10 +71,20 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = { // Send PingReq Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq"); - this.SendQueue.Add(new PingReqPacket()); + this.SendQueue.Enqueue(new PingReqPacket()); } } + // Dumping Client State + Logger.Trace($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- SendQueue:{this.SendQueue.Count} ReceivedQueue:{this.ReceivedQueue.Count} OutgoingPublishQueue:{this.OutgoingPublishQueue.Count}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:{this.transactionQueue.Count}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionMonitor:{this.ConnectionMonitorTask?.Status}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionPublishWriter:{this.ConnectionPublishWriterTask?.Status}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionWriter:{this.ConnectionWriterTask?.Status}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionReader:{this.ConnectionReaderTask?.Status}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- - ReceivedPacketsHandler:{this.ReceivedPacketsHandlerTask?.Status}"); + try { await Task.Delay(2000, cancellationToken).ConfigureAwait(false); @@ -87,131 +102,202 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = }, cancellationToken); /// - /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue. + /// Asynchronous background task that handles the outgoing publish packets queued in OutgoingPublishQueue. /// - private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run( + private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) => Task.Run( async () => { this.lastCommunicationTimer.Start(); - Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.ConnectState}"); + Logger.Trace($"{this.Options.ClientId}-(PW)- Starting...{this.ConnectState}"); while (true) { if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); + Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled with {this.OutgoingPublishQueue.Count} publish packets remaining."); break; } while (this.ConnectState == ConnectState.Disconnected) { - Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); + Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); await Task.Delay(2000).ConfigureAwait(false); continue; } - Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); + // Logger.Trace($"{this.Options.ClientId}-(PW)- {this.OutgoingPublishQueue.Count} publish packets waiting to be sent."); - var packet = this.SendQueue.Take(); - FlushResult writeResult = default; + var receiveMaximum = this.ConnectionProperties.ReceiveMaximum ?? 65535; + if (this.transactionQueue.Count >= receiveMaximum) + { + Logger.Debug($"The Maximum number of publishes have been sent to broker. Waiting for existing transactions to complete."); + await Task.Delay(2000).ConfigureAwait(false); + continue; + } - switch (packet) + if (this.OutgoingPublishQueue.TryDequeue(out var publishPacket)) { - // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. - case ConnectPacket connectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); - this.OnConnectSentEventLauncher(connectPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); - this.OnDisconnectSentEventLauncher(disconnectPacket); - break; - case SubscribePacket subscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); - this.OnSubscribeSentEventLauncher(subscribePacket); - break; - case UnsubscribePacket unsubscribePacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); - this.OnUnsubscribeSentEventLauncher(unsubscribePacket); - break; - case PublishPacket publishPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); - if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery || - publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) + FlushResult writeResult = default; + + Logger.Trace($"{this.Options.ClientId}-(PW)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}"); + if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery || + publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) + { + // QoS > 0 - Add to transaction queue + if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) { - // QoS > 0 - Add to transaction queue - if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) - { - Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); - continue; - } + Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); + continue; } + } - writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); - this.OnPublishSentEventLauncher(publishPacket); - break; - case PubAckPacket pubAckPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); - this.OnPubAckSentEventLauncher(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - // This is in response to a received Publish packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); - this.OnPubRecSentEventLauncher(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - // This is in response to a received PubRec packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); - this.OnPubRelSentEventLauncher(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - // This is in response to a received PubRel packet. Communication chain management - // was done in the receiver code. Just send the response. - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); - writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); - this.OnPubCompSentEventLauncher(pubCompPacket); + writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); + this.OnPublishSentEventLauncher(publishPacket); + + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled"); break; - case PingReqPacket pingReqPacket: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); - writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); - this.OnPingReqSentEventLauncher(pingReqPacket); + } + + if (writeResult.IsCompleted) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); break; + } - /* case AuthPacket authPacket: - /* writeResult = await this.Writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false); - /* this.OnAuthSentEventLauncher(authPacket); - /* break; - */ - default: - Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); + this.lastCommunicationTimer.Restart(); + + if (this.transactionQueue.Count >= (this.ConnectionProperties.ReceiveMaximum ?? 65535)) + { break; - } // switch + } + + } + else + { + // Queue is empty + await Task.Delay(1).ConfigureAwait(false); + } // TryDequeue + } // while(true) + + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); + return true; + }, cancellationToken); - if (writeResult.IsCanceled) + /// + /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue. + /// + private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run( + async () => + { + this.lastCommunicationTimer.Start(); + Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.ConnectState}"); + + while (true) + { + if (cancellationToken.IsCancellationRequested) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.SendQueue.Count} packets remaining."); break; } - if (writeResult.IsCompleted) + while (this.ConnectState == ConnectState.Disconnected) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); - break; + Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect..."); + await Task.Delay(2000).ConfigureAwait(false); + continue; } - this.lastCommunicationTimer.Restart(); - } // foreach + // Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); + + if (this.SendQueue.TryDequeue(out var packet)) + { + FlushResult writeResult = default; + + switch (packet) + { + // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. + case ConnectPacket connectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); + this.OnConnectSentEventLauncher(connectPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); + this.OnDisconnectSentEventLauncher(disconnectPacket); + break; + case SubscribePacket subscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); + this.OnSubscribeSentEventLauncher(subscribePacket); + break; + case UnsubscribePacket unsubscribePacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); + this.OnUnsubscribeSentEventLauncher(unsubscribePacket); + break; + case PublishPacket publishPacket: + throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); + case PubAckPacket pubAckPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); + this.OnPubAckSentEventLauncher(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + // This is in response to a received Publish packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); + this.OnPubRecSentEventLauncher(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + // This is in response to a received PubRec packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); + this.OnPubRelSentEventLauncher(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + // This is in response to a received PubRel packet. Communication chain management + // was done in the receiver code. Just send the response. + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}"); + writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); + this.OnPubCompSentEventLauncher(pubCompPacket); + break; + case PingReqPacket pingReqPacket: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); + writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); + this.OnPingReqSentEventLauncher(pingReqPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); + break; + } // switch + + if (writeResult.IsCanceled) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled"); + break; + } + + if (writeResult.IsCompleted) + { + Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream"); + break; + } + + this.lastCommunicationTimer.Restart(); + } + else + { + // Queue is empty + await Task.Delay(1).ConfigureAwait(false); + } // TryDequeue + } // while(true) Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); return true; @@ -289,11 +375,23 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => buffer = buffer.Slice(consumed); this.Reader?.AdvanceTo(buffer.Start); - // Add the packet to the received queue for processing later - // by ReceivedPacketsHandlerAsync - Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue."); - this.ReceivedQueue.Add(decodedPacket); + // We handle disconnects immediately + if (decodedPacket is DisconnectPacket disconnectPacket) + { + Logger.Warn($"-(R)- <-- Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + this.OnDisconnectReceivedEventLauncher(disconnectPacket); + break; + } + else + { + Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue."); + // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync + this.ReceivedQueue.Enqueue(decodedPacket); + } } // while (buffer.Length > 0 + + await Task.Yield(); } // while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); @@ -318,75 +416,83 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok break; } - Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); - - var packet = this.ReceivedQueue.Take(); + // Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); - if (this.Options.ClientMaximumPacketSize != null) + if (this.ReceivedQueue.TryDequeue(out var packet)) { - if (packet.PacketSize > this.Options.ClientMaximumPacketSize) + if (this.Options.ClientMaximumPacketSize != null) { - Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - - var opts = new DisconnectOptions + if (packet.PacketSize > this.Options.ClientMaximumPacketSize) { - ReasonCode = DisconnectReasonCode.PacketTooLarge, - ReasonString = "Packet Too Large", - }; - await this.DisconnectAsync(opts).ConfigureAwait(false); - return false; + Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); + + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.PacketTooLarge, + ReasonString = "Packet Too Large", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + return false; + } } - } - switch (packet) + switch (packet) + { + case ConnAckPacket connAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + this.ConnectionProperties = connAckPacket.Properties; + this.OnConnAckReceivedEventLauncher(connAckPacket); + break; + case DisconnectPacket disconnectPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier} {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + Logger.Warn($"We shouldn't get the disconnect here - Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); + // Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + // await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + // this.OnDisconnectReceivedEventLauncher(disconnectPacket); + break; + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case SubAckPacket subAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); + this.OnSubAckReceivedEventLauncher(subAckPacket); + break; + case UnsubAckPacket unsubAckPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); + this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); + break; + case PublishPacket publishPacket: + this.HandleIncomingPublishPacket(publishPacket); + break; + case PubAckPacket pubAckPacket: + this.HandleIncomingPubAckPacket(pubAckPacket); + break; + case PubRecPacket pubRecPacket: + this.HandleIncomingPubRecPacket(pubRecPacket); + break; + case PubRelPacket pubRelPacket: + this.HandleIncomingPubRelPacket(pubRelPacket); + break; + case PubCompPacket pubCompPacket: + this.HandleIncomingPubCompPacket(pubCompPacket); + break; + default: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); + Logger.Error($"Unrecognized packet received. Will discard. {packet}"); + break; + } // switch (packet) + } + else { - case ConnAckPacket connAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case DisconnectPacket disconnectPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}"); - Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - await this.HandleDisconnectionAsync(false).ConfigureAwait(false); - this.OnDisconnectReceivedEventLauncher(disconnectPacket); - break; - case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); - break; - case SubAckPacket subAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); - this.OnSubAckReceivedEventLauncher(subAckPacket); - break; - case UnsubAckPacket unsubAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); - this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); - break; - case PublishPacket publishPacket: - this.HandleIncomingPublishPacket(publishPacket); - break; - case PubAckPacket pubAckPacket: - this.HandleIncomingPubAckPacket(pubAckPacket); - break; - case PubRecPacket pubRecPacket: - this.HandleIncomingPubRecPacket(pubRecPacket); - break; - case PubRelPacket pubRelPacket: - this.HandleIncomingPubRelPacket(pubRelPacket); - break; - case PubCompPacket pubCompPacket: - this.HandleIncomingPubCompPacket(pubCompPacket); - break; - default: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard."); - Logger.Error($"Unrecognized packet received. Will discard. {packet}"); - break; - } // switch (packet) - } // while + // Queue is empty + await Task.Delay(1).ConfigureAwait(false); + } // TryDequeue + } // while (true) Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}"); - return true; }, cancellationToken); @@ -399,16 +505,17 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}"); this.OnPublishReceivedEventLauncher(publishPacket); - if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery) + if (publishPacket.Message.QoS is QualityOfService.AtLeastOnceDelivery) { // We've received a QoS 1 publish. Send a PubAck. var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); // FIXME We should wait until puback is sent before launching event // FIXME Check DUP flag setting - this.SendQueue.Add(pubAckResponse); + this.SendQueue.Enqueue(pubAckResponse); + publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckResponse); } - else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) + else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { // We've received a QoS 2 publish. Send a PubRec and add to QoS2 transaction register. var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); @@ -421,7 +528,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; } - this.SendQueue.Add(pubRecResponse); + this.SendQueue.Enqueue(pubRecResponse); } this.OnMessageReceivedEventLauncher(publishPacket); @@ -483,13 +590,13 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) } // Send the PUBREL response - this.SendQueue.Add(pubRelResponsePacket); + this.SendQueue.Enqueue(pubRelResponsePacket); } else { // Send a PUBREL with PacketIdentifierNotFound var pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound); - this.SendQueue.Add(pubRelResponsePacket); + this.SendQueue.Enqueue(pubRelResponsePacket); } } @@ -524,7 +631,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) Logger.Warn($"QoS2: Couldn't remove PubRel --> PubComp QoS2 Chain for packet identifier {pubRelPacket.PacketIdentifier}."); } - this.SendQueue.Add(pubCompResponsePacket); + this.SendQueue.Enqueue(pubCompResponsePacket); } else { @@ -533,7 +640,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) // Send a PUBCOMP with PacketIdentifierNotFound var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound); - this.SendQueue.Add(pubCompResponsePacket); + this.SendQueue.Enqueue(pubCompResponsePacket); } } diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs index 8c24b0ba..149ca7e2 100644 --- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs +++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs @@ -190,9 +190,6 @@ protected virtual void Dispose(bool disposing) } // Dispose managed resources. - this.SendQueue.CompleteAdding(); - this.ReceivedQueue.CompleteAdding(); - this.cancellationTokenSource.Cancel(); this.cancellationTokenSource.Dispose(); } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs index 5877f47f..aee5295d 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/ConnectTest.cs @@ -68,6 +68,7 @@ public async Task TestConnectEventsAsync() // Wait for event handlers to finish await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + await Task.Delay(1000).ConfigureAwait(false); // Assert that all Events were called Assert.True(client.LocalStore.ContainsKey("BeforeConnectHandlerCalled")); @@ -107,6 +108,7 @@ public async Task Test_AfterDisconnectEvent_Async() // Wait for event handlers to finish await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + await Task.Delay(1000).ConfigureAwait(false); // Assert that all Events were called Assert.True(client.LocalStore.ContainsKey("AfterDisconnectHandlerCalled")); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs index f1c260fc..8ffb03c7 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LWTTest.cs @@ -30,7 +30,8 @@ public async Task Last_Will_With_Properties_Async() Assert.Equal("last will message", args.PublishMessage.PayloadAsString); Assert.Equal("application/text", args.PublishMessage.ContentType); Assert.Equal("response/topic", args.PublishMessage.ResponseTopic); - Assert.Equal(new byte[] { 1, 2, 3, 4, 5 }, args.PublishMessage.CorrelationData); + byte[] correlationData = [1, 2, 3, 4, 5]; + Assert.Equal(correlationData, args.PublishMessage.CorrelationData); Assert.Equal(MQTT5PayloadFormatIndicator.UTF8Encoded, args.PublishMessage.PayloadFormatIndicator); Assert.Equal(100, args.PublishMessage.MessageExpiryInterval); Assert.Single(args.PublishMessage.UserProperties); @@ -60,7 +61,8 @@ public async Task Last_Will_With_Properties_Async() options.LastWillAndTestament.QoS = QualityOfService.AtLeastOnceDelivery; options.LastWillAndTestament.ContentType = "application/text"; options.LastWillAndTestament.ResponseTopic = "response/topic"; - options.LastWillAndTestament.CorrelationData = new byte[] { 1, 2, 3, 4, 5 }; + byte[] correlationData = [1, 2, 3, 4, 5]; + options.LastWillAndTestament.CorrelationData = correlationData; options.LastWillAndTestament.UserProperties.Add("userPropertyKey", "userPropertyValue"); var client = new HiveMQClient(options); diff --git a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs index 471b99f0..20455816 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/LastWillAndTestamentBuilderTest.cs @@ -41,6 +41,7 @@ public async Task Last_Will_With_Properties_Async() var messagesReceived = 0; var taskLWTReceived = new TaskCompletionSource(); + byte[] correlationDataBytes = [1, 2, 3, 4, 5]; // Set the event handler for the message received event listenerClient.OnMessageReceived += (sender, args) => @@ -51,7 +52,7 @@ public async Task Last_Will_With_Properties_Async() Assert.Equal("last will message", args.PublishMessage.PayloadAsString); Assert.Equal("application/text", args.PublishMessage.ContentType); Assert.Equal("response/topic", args.PublishMessage.ResponseTopic); - Assert.Equal(new byte[] { 1, 2, 3, 4, 5 }, args.PublishMessage.CorrelationData); + Assert.Equal(correlationDataBytes, args.PublishMessage.CorrelationData); Assert.Equal(MQTT5PayloadFormatIndicator.UTF8Encoded, args.PublishMessage.PayloadFormatIndicator); Assert.Equal(100, args.PublishMessage.MessageExpiryInterval); Assert.Single(args.PublishMessage.UserProperties); @@ -75,7 +76,7 @@ public async Task Last_Will_With_Properties_Async() .WithQualityOfServiceLevel(QualityOfService.AtLeastOnceDelivery) .WithContentType("application/text") .WithResponseTopic("response/topic") - .WithCorrelationData(new byte[] { 1, 2, 3, 4, 5 }) + .WithCorrelationData(correlationDataBytes) .WithPayloadFormatIndicator(MQTT5PayloadFormatIndicator.UTF8Encoded) .WithMessageExpiryInterval(100) .WithUserProperty("userPropertyKey", "userPropertyValue") diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs index bdcec6b7..a73b16d0 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs @@ -1,8 +1,10 @@ namespace HiveMQtt.Test.HiveMQClient; +using System.Runtime.CompilerServices; using System.Text; using System.Threading.Tasks; using HiveMQtt.Client; +using HiveMQtt.Client.Events; using HiveMQtt.MQTT5.ReasonCodes; using HiveMQtt.MQTT5.Types; using Xunit; @@ -31,7 +33,7 @@ public async Task MostBasicPublishWithQoS0Async() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); var msg = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}"); - var result = await client.PublishAsync("tests/MostBasicPublishWithQoS0Async", msg, MQTT5.Types.QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + var result = await client.PublishAsync("tests/MostBasicPublishWithQoS0Async", msg, QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); @@ -45,7 +47,7 @@ public async Task MostBasicPublishWithQoS1Async() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); var msg = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}"); - var result = await client.PublishAsync("tests/MostBasicPublishWithQoS1Async", msg, MQTT5.Types.QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + var result = await client.PublishAsync("tests/MostBasicPublishWithQoS1Async", msg, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); Assert.NotNull(result.QoS1ReasonCode); Assert.Equal(PubAckReasonCode.NoMatchingSubscribers, result?.QoS1ReasonCode); @@ -61,7 +63,7 @@ public async Task MostBasicPublishWithQoS2Async() Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); var msg = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}"); - var result = await client.PublishAsync("tests/MostBasicPublishWithQoS2Async", msg, MQTT5.Types.QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + var result = await client.PublishAsync("tests/MostBasicPublishWithQoS2Async", msg, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); Assert.NotNull(result.QoS2ReasonCode); Assert.Equal(PubRecReasonCode.NoMatchingSubscribers, result?.QoS2ReasonCode); @@ -81,7 +83,7 @@ public async Task MultiPublishWithQoS0Async() for (var i = 1; i <= 10; i++) { - result = await client.PublishAsync("tests/MultiPublishWithQoS0Async", msg, MQTT5.Types.QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + result = await client.PublishAsync("tests/MultiPublishWithQoS0Async", msg, QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); Assert.IsType(result); Assert.Null(result.QoS1ReasonCode); Assert.Null(result.QoS2ReasonCode); @@ -104,7 +106,7 @@ public async Task MultiPublishWithQoS1Async() for (var i = 1; i <= 10; i++) { - result = await client.PublishAsync("tests/MultiPublishWithQoS1Async", msg, MQTT5.Types.QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + result = await client.PublishAsync("tests/MultiPublishWithQoS1Async", msg, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); Assert.IsType(result); Assert.NotNull(result.QoS1ReasonCode); Assert.Equal(PubAckReasonCode.NoMatchingSubscribers, result?.QoS1ReasonCode); @@ -126,7 +128,7 @@ public async Task MultiPublishWithQoS2Async() for (var i = 1; i <= 10; i++) { - result = await client.PublishAsync("tests/MultiPublishWithQoS2Async", msg, MQTT5.Types.QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + result = await client.PublishAsync("tests/MultiPublishWithQoS2Async", msg, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); Assert.NotNull(result.QoS2ReasonCode); Assert.Equal(PubRecReasonCode.NoMatchingSubscribers, result?.QoS2ReasonCode); } @@ -175,4 +177,266 @@ public async Task PublishPayloadFormatIndicatorAsync() var disconnectResult = await client.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); } + + [Fact] + public async Task ThreeNodeQoS0ChainedPublishesAsync() + { + var client1 = new HiveMQClient(); // publish message + var client2 = new HiveMQClient(); // receive and re-publish to another topic + var client3 = new HiveMQClient(); // receive republished message + + // Connect client 1 + var connectResult = await client1.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 2 + connectResult = await client2.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 3 + connectResult = await client3.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // client 2 Subscribe to the topic + var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + var client2MessageCount = 0; + + // client 2 will receive the message and republish it to another topic + async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client2MessageCount++; + if (sender is HiveMQClient client) + { + var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtMostOnceDelivery).ConfigureAwait(true); + Assert.NotNull(publishResult); + } + } + + client2.OnMessageReceived += Client2MessageHandler; + + // client 3 Subscribe to the secondary topic + subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + var client3MessageCount = 0; + + // client 3 will receive the final message + async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client3MessageCount++; + Assert.NotNull(eventArgs.PublishMessage); + Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); + } + + client3.OnMessageReceived += Client3MessageHandler; + + // client 1 Publish 100 messages + for (var i = 1; i <= 10; i++) + { + var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.AtMostOnceDelivery).ConfigureAwait(false); + Assert.NotNull(publishResult); + } + + await Task.Delay(3000).ConfigureAwait(false); + + Assert.Equal(10, client2MessageCount); + Assert.Equal(10, client3MessageCount); + + Assert.Empty(client1.OutgoingPublishQueue); + Assert.Empty(client2.OutgoingPublishQueue); + Assert.Empty(client3.OutgoingPublishQueue); + + Assert.Empty(client1.ReceivedQueue); + Assert.Empty(client2.ReceivedQueue); + Assert.Empty(client3.ReceivedQueue); + + Assert.Empty(client1.SendQueue); + Assert.Empty(client2.SendQueue); + Assert.Empty(client3.SendQueue); + + Assert.Empty(client1.transactionQueue); + Assert.Empty(client2.transactionQueue); + Assert.Empty(client3.transactionQueue); + + // All done, disconnect all clients + var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client2.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client3.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } + + [Fact] + public async Task ThreeNodeQoS1ChainedPublishesAsync() + { + var client1 = new HiveMQClient(); // publish message + var client2 = new HiveMQClient(); // receive and re-publish to another topic + var client3 = new HiveMQClient(); // receive republished message + + // Connect client 1 + var connectResult = await client1.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 2 + connectResult = await client2.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 3 + connectResult = await client3.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // client 2 Subscribe to the topic + var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + var client2MessageCount = 0; + + // client 2 will receive the message and republish it to another topic + async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client2MessageCount++; + if (sender is HiveMQClient client) + { + var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + Assert.NotNull(publishResult); + Assert.Equal(publishResult.QoS1ReasonCode, PubAckReasonCode.Success); + } + } + + client2.OnMessageReceived += Client2MessageHandler; + + // client 3 Subscribe to the secondary topic + subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + + var client3MessageCount = 0; + + // client 3 will receive the final message + async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client3MessageCount++; + Assert.NotNull(eventArgs.PublishMessage); + Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); + } + + client3.OnMessageReceived += Client3MessageHandler; + + // client 1 Publish 10 messages + for (var i = 1; i <= 10; i++) + { + var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false); + Assert.NotNull(publishResult); + } + + await Task.Delay(2000).ConfigureAwait(false); + + Assert.Equal(10, client2MessageCount); + Assert.Equal(10, client3MessageCount); + + Assert.Empty(client1.OutgoingPublishQueue); + Assert.Empty(client2.OutgoingPublishQueue); + Assert.Empty(client3.OutgoingPublishQueue); + + Assert.Empty(client1.ReceivedQueue); + Assert.Empty(client2.ReceivedQueue); + Assert.Empty(client3.ReceivedQueue); + + Assert.Empty(client1.SendQueue); + Assert.Empty(client2.SendQueue); + Assert.Empty(client3.SendQueue); + + Assert.Empty(client1.transactionQueue); + Assert.Empty(client2.transactionQueue); + Assert.Empty(client3.transactionQueue); + + // All done, disconnect all clients + var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client2.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client3.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } + + [Fact] + public async Task ThreeNodeQoS2ChainedPublishesAsync() + { + var client1 = new HiveMQClient(); // publish message + var client2 = new HiveMQClient(); // receive and re-publish to another topic + var client3 = new HiveMQClient(); // receive republished message + + // Connect client 1 + var connectResult = await client1.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 2 + connectResult = await client2.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // Connect client 3 + connectResult = await client3.ConnectAsync().ConfigureAwait(false); + Assert.True(connectResult.ReasonCode == ConnAckReasonCode.Success); + + // client 2 Subscribe to the topic + var subscribeResult = await client2.SubscribeAsync("HMQ/FirstTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + var client2MessageCount = 0; + + // client 2 will receive the message and republish it to another topic + async void Client2MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client2MessageCount++; + var client = sender as HiveMQClient; + var publishResult = await client.PublishAsync("HMQ/SecondTopic", eventArgs.PublishMessage.PayloadAsString, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(true); + Assert.NotNull(publishResult); + Assert.Equal(publishResult.QoS2ReasonCode, PubRecReasonCode.Success); + } + + client2.OnMessageReceived += Client2MessageHandler; + + // client 3 Subscribe to the secondary topic + subscribeResult = await client3.SubscribeAsync("HMQ/SecondTopic", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + + var client3MessageCount = 0; + // client 3 will receive the final message + async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs eventArgs) + { + client3MessageCount++; + Assert.NotNull(eventArgs.PublishMessage); + Assert.Equal("Hello World", eventArgs.PublishMessage.PayloadAsString); + } + + client3.OnMessageReceived += Client3MessageHandler; + + // client 1 Publish 10 messages + for (var i = 1; i <= 10; i++) + { + var publishResult = await client1.PublishAsync("HMQ/FirstTopic", "Hello World", QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false); + Assert.NotNull(publishResult); + } + + await Task.Delay(2000).ConfigureAwait(false); + + Assert.Equal(10, client2MessageCount); + Assert.Equal(10, client3MessageCount); + + Assert.Empty(client1.OutgoingPublishQueue); + Assert.Empty(client2.OutgoingPublishQueue); + Assert.Empty(client3.OutgoingPublishQueue); + + Assert.Empty(client1.ReceivedQueue); + Assert.Empty(client2.ReceivedQueue); + Assert.Empty(client3.ReceivedQueue); + + Assert.Empty(client1.SendQueue); + Assert.Empty(client2.SendQueue); + Assert.Empty(client3.SendQueue); + + Assert.Empty(client1.transactionQueue); + Assert.Empty(client2.transactionQueue); + Assert.Empty(client3.transactionQueue); + + // All done, disconnect all clients + var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client2.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + disconnectResult = await client3.DisconnectAsync().ConfigureAwait(false); + Assert.True(disconnectResult); + } } diff --git a/Tests/HiveMQtt.Test/HiveMQClient/UnsubscribeBuilderTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/UnsubscribeBuilderTest.cs index 7c3ff246..a3bd2a9c 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/UnsubscribeBuilderTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/UnsubscribeBuilderTest.cs @@ -103,5 +103,4 @@ public async Task InvalidUnsubscribeStringAsync() var disconnectResult = await subClient.DisconnectAsync().ConfigureAwait(false); Assert.True(disconnectResult); } - }