From 684385d819b493a67fc087ac5e38bb1396ff9c3f Mon Sep 17 00:00:00 2001 From: Peter Giacomo Lombardo Date: Tue, 21 May 2024 15:49:04 +0200 Subject: [PATCH] Events: Efficiency & Performance Improvements (#161) --- Source/HiveMQtt/Client/HiveMQClient.cs | 20 +- Source/HiveMQtt/Client/HiveMQClientEvents.cs | 692 +++++++++++------- .../Client/HiveMQClientTrafficProcessor.cs | 42 +- Source/HiveMQtt/MQTT5/PacketDecoder.cs | 2 +- Source/HiveMQtt/NLog.config | 2 +- .../HiveMQtt.Test/HiveMQClient/PublishTest.cs | 22 +- 6 files changed, 465 insertions(+), 315 deletions(-) diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index ca9c5a2a..9542b751 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -91,7 +91,7 @@ public async Task ConnectAsync() // Construct the MQTT Connect packet and queue to send var connPacket = new ConnectPacket(this.Options); - Logger.Trace($"Queuing packet for send: {connPacket}"); + Logger.Trace($"Queuing packet for send: {connPacket.GetType().Name} id={connPacket.PacketIdentifier}"); this.SendQueue.Enqueue(connPacket); ConnAckPacket connAck; @@ -162,7 +162,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) EventHandler eventHandler = TaskHandler; this.OnDisconnectSent += eventHandler; - Logger.Trace($"Queuing packet for send: {disconnectPacket}"); + Logger.Trace($"Queuing packet for send: {disconnectPacket.GetType().Name} id={disconnectPacket.PacketIdentifier}"); this.SendQueue.Enqueue(disconnectPacket); try @@ -187,7 +187,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) /// public async Task PublishAsync(MQTT5PublishMessage message) { - if (this.IsConnected() == false) + if (!this.IsConnected()) { throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync."); } @@ -200,7 +200,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) // QoS 0: Fast Service if (message.QoS == QualityOfService.AtMostOnceDelivery) { - Logger.Trace($"Queuing packet for send: {publishPacket}"); + Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); return new PublishResult(publishPacket.Message); } @@ -212,7 +212,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) EventHandler eventHandler = TaskHandler; publishPacket.OnPublishQoS1Complete += eventHandler; - Logger.Trace($"Queuing packet for send: {publishPacket}"); + Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); @@ -229,7 +229,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) EventHandler eventHandler = TaskHandler; publishPacket.OnPublishQoS2Complete += eventHandler; - Logger.Trace($"Queuing packet for send: {publishPacket}"); + Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); List packetList; @@ -244,7 +244,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) Logger.Error("PublishAsync: QoS 2 timeout. No response received in time."); // Remove the transaction chain - if (this.transactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain)) + if (this.TransactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain)) { Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}."); } @@ -320,7 +320,7 @@ public async Task SubscribeAsync(string topic, QualityOfService /// public async Task SubscribeAsync(SubscribeOptions options) { - if (this.IsConnected() == false) + if (!this.IsConnected()) { throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync."); } @@ -329,7 +329,7 @@ public async Task SubscribeAsync(SubscribeOptions options) this.BeforeSubscribeEventLauncher(options); // FIXME: We should only ever have one subscribe in flight at any time (for now) - // Construct the MQTT Connect packet + // Construct the MQTT Subscribe packet var packetIdentifier = this.GeneratePacketIdentifier(); var subscribePacket = new SubscribePacket(options, (ushort)packetIdentifier); @@ -433,7 +433,7 @@ public async Task UnsubscribeAsync(List subscri public async Task UnsubscribeAsync(UnsubscribeOptions unsubOptions) { - if (this.IsConnected() == false) + if (!this.IsConnected()) { throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync."); } diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index e4ece8de..8a13baf8 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -32,183 +32,245 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient /// /// Event that is fired before the client connects to the broker. /// - public event EventHandler BeforeConnect = new((client, e) => { }); + public event EventHandler? BeforeConnect; protected virtual void BeforeConnectEventLauncher(HiveMQClientOptions options) { - var eventArgs = new BeforeConnectEventArgs(options); - Logger.Trace("BeforeConnectEventLauncher"); - _ = Task.Run(() => this.BeforeConnect?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.BeforeConnect != null && this.BeforeConnect.GetInvocationList().Length > 0) + { + Logger.Trace("BeforeConnectEventLauncher"); + var eventArgs = new BeforeConnectEventArgs(options); + _ = Task.Run(() => this.BeforeConnect?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("BeforeConnectEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("BeforeConnectEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client connects to the broker. /// - public event EventHandler AfterConnect = new((client, e) => { }); + public event EventHandler? AfterConnect; protected virtual void AfterConnectEventLauncher(ConnectResult results) { - var eventArgs = new AfterConnectEventArgs(results); - Logger.Trace("AfterConnectEventLauncher"); - _ = Task.Run(() => this.AfterConnect?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.AfterConnect != null && this.AfterConnect.GetInvocationList().Length > 0) + { + var eventArgs = new AfterConnectEventArgs(results); + Logger.Trace("AfterConnectEventLauncher"); + _ = Task.Run(() => this.AfterConnect?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("AfterConnectEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("AfterConnectEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired before the client disconnects from the broker. /// - public event EventHandler BeforeDisconnect = new((client, e) => { }); + public event EventHandler? BeforeDisconnect; protected virtual void BeforeDisconnectEventLauncher() { - var eventArgs = new BeforeDisconnectEventArgs(); - Logger.Trace("BeforeDisconnectEventLauncher"); - _ = Task.Run(() => this.BeforeDisconnect?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.BeforeDisconnect != null && this.BeforeDisconnect.GetInvocationList().Length > 0) + { + var eventArgs = new BeforeDisconnectEventArgs(); + Logger.Trace("BeforeDisconnectEventLauncher"); + _ = Task.Run(() => this.BeforeDisconnect?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("BeforeDisconnectEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("BeforeDisconnectEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client is disconnected from the broker. /// - public event EventHandler AfterDisconnect = new((client, e) => { }); + public event EventHandler? AfterDisconnect; protected virtual void AfterDisconnectEventLauncher(bool clean = false) { - var eventArgs = new AfterDisconnectEventArgs(clean); - Logger.Trace("AfterDisconnectEventLauncher"); - _ = Task.Run(() => this.AfterDisconnect?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.AfterDisconnect != null && this.AfterDisconnect.GetInvocationList().Length > 0) + { + var eventArgs = new AfterDisconnectEventArgs(clean); + Logger.Trace("AfterDisconnectEventLauncher"); + _ = Task.Run(() => this.AfterDisconnect?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("AfterDisconnectEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("AfterDisconnectEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired before the client sends a subscribe request. /// - public event EventHandler BeforeSubscribe = new((client, e) => { }); + public event EventHandler? BeforeSubscribe; protected virtual void BeforeSubscribeEventLauncher(SubscribeOptions options) { - var eventArgs = new BeforeSubscribeEventArgs(options); - Logger.Trace("BeforeSubscribeEventLauncher"); - _ = Task.Run(() => this.BeforeSubscribe?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.BeforeSubscribe != null && this.BeforeSubscribe.GetInvocationList().Length > 0) + { + var eventArgs = new BeforeSubscribeEventArgs(options); + Logger.Trace("BeforeSubscribeEventLauncher"); + _ = Task.Run(() => this.BeforeSubscribe?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("BeforeSubscribeEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("BeforeSubscribeEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a subscribe request. /// - public event EventHandler AfterSubscribe = new((client, e) => { }); + public event EventHandler? AfterSubscribe; protected virtual void AfterSubscribeEventLauncher(SubscribeResult results) { - var eventArgs = new AfterSubscribeEventArgs(results); - Logger.Trace("AfterSubscribeEventLauncher"); - _ = Task.Run(() => this.AfterSubscribe?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.AfterSubscribe != null && this.AfterSubscribe.GetInvocationList().Length > 0) + { + var eventArgs = new AfterSubscribeEventArgs(results); + Logger.Trace("AfterSubscribeEventLauncher"); + _ = Task.Run(() => this.AfterSubscribe?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("AfterSubscribeEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("AfterSubscribeEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired before the client sends a subscribe request. /// - public event EventHandler BeforeUnsubscribe = new((client, e) => { }); + public event EventHandler? BeforeUnsubscribe; protected virtual void BeforeUnsubscribeEventLauncher(List subscriptions) { - var eventArgs = new BeforeUnsubscribeEventArgs(subscriptions); - Logger.Trace("BeforeUnsubscribeEventLauncher"); - _ = Task.Run(() => this.BeforeUnsubscribe?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.BeforeUnsubscribe != null && this.BeforeUnsubscribe.GetInvocationList().Length > 0) + { + var eventArgs = new BeforeUnsubscribeEventArgs(subscriptions); + Logger.Trace("BeforeUnsubscribeEventLauncher"); + _ = Task.Run(() => this.BeforeUnsubscribe?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("BeforeUnsubscribeEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("BeforeUnsubscribeEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a subscribe request. /// - public event EventHandler AfterUnsubscribe = new((client, e) => { }); + public event EventHandler? AfterUnsubscribe; protected virtual void AfterUnsubscribeEventLauncher(UnsubscribeResult results) { - var eventArgs = new AfterUnsubscribeEventArgs(results); - Logger.Trace("AfterUnsubscribeEventLauncher"); - _ = Task.Run(() => this.AfterUnsubscribe?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.AfterUnsubscribe != null && this.AfterUnsubscribe.GetInvocationList().Length > 0) + { + var eventArgs = new AfterUnsubscribeEventArgs(results); + Logger.Trace("AfterUnsubscribeEventLauncher"); + _ = Task.Run(() => this.AfterUnsubscribe?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("AfterUnsubscribeEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("AfterUnsubscribeEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired when a message is received from the broker. /// - public event EventHandler OnMessageReceived = new((client, e) => { }); + public event EventHandler? OnMessageReceived; protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) { var eventArgs = new OnMessageReceivedEventArgs(packet.Message); - Logger.Trace("OnMessageReceivedEventLauncher"); + var messageHandled = false; - // Global Event Handler - _ = Task.Run(() => this.OnMessageReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnMessageReceived != null && this.OnMessageReceived.GetInvocationList().Length > 0) + { + Logger.Trace("OnMessageReceivedEventLauncher"); + + // Global Event Handler + _ = Task.Run(() => this.OnMessageReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnMessageReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnMessageReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + + messageHandled = true; + } // Per Subscription Event Handler foreach (var subscription in this.Subscriptions) { if (packet.Message.Topic != null && MatchTopic(subscription.TopicFilter.Topic, packet.Message.Topic)) { - if (subscription.MessageReceivedHandler != null) + if (subscription.MessageReceivedHandler != null && subscription.MessageReceivedHandler.GetInvocationList().Length > 0) { - _ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith(t => + // We have a per-subscription message handler. + _ = Task.Run(() => subscription.MessageReceivedHandler?.Invoke(this, eventArgs)).ContinueWith( + t => { if (t.IsFaulted) { - Logger.Error("per-subscription OnMessageReceivedEventLauncher exception: " + t.Exception.Message); + Logger.Error("per-subscription MessageReceivedEventLauncher exception: " + t.Exception.Message); } - }); + }, + TaskScheduler.Default); + + messageHandled = true; } } } + + if (!messageHandled) + { + // We received an application message for a subscription without a MessageReceivedHandler + // AND there is also no global OnMessageReceived event handler. This publish is thus lost and unhandled. + // We warn here about the lost message, but we don't throw an exception. + Logger.Warn($"Lost Application Message ({packet.Message.Topic}): No global or subscription message handler found. Register an event handler (before Subscribing) to receive all messages incoming."); + } } /* ========================================================================================= */ @@ -218,360 +280,460 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) /// /// Event that is fired after the client sends a CONNECT packet to the broker. /// - public event EventHandler OnConnectSent = new((client, e) => { }); + public event EventHandler? OnConnectSent; protected virtual void OnConnectSentEventLauncher(ConnectPacket packet) { - var eventArgs = new OnConnectSentEventArgs(packet); - Logger.Trace("OnConnectSentEventLauncher"); - _ = Task.Run(() => this.OnConnectSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnConnectSent != null && this.OnConnectSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnConnectSentEventArgs(packet); + Logger.Trace("OnConnectSentEventLauncher"); + _ = Task.Run(() => this.OnConnectSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnConnectSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnConnectSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a CONNACK packet from the broker. /// - public event EventHandler OnConnAckReceived = new((client, e) => { }); + public event EventHandler? OnConnAckReceived; protected virtual void OnConnAckReceivedEventLauncher(ConnAckPacket packet) { - var eventArgs = new OnConnAckReceivedEventArgs(packet); - Logger.Trace("OnConnAckReceivedEventLauncher"); - _ = Task.Run(() => this.OnConnAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnConnAckReceived != null && this.OnConnAckReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnConnAckReceivedEventArgs(packet); + Logger.Trace("OnConnAckReceivedEventLauncher"); + _ = Task.Run(() => this.OnConnAckReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnConnAckReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnConnAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client send a Disconnect packet from the broker. /// - public event EventHandler OnDisconnectSent = new((client, e) => { }); + public event EventHandler? OnDisconnectSent; protected virtual void OnDisconnectSentEventLauncher(DisconnectPacket packet) { - var eventArgs = new OnDisconnectSentEventArgs(packet); - Logger.Trace("OnDisconnectSentEventLauncher"); - _ = Task.Run(() => this.OnDisconnectSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnDisconnectSent != null && this.OnDisconnectSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnDisconnectSentEventArgs(packet); + Logger.Trace("OnDisconnectSentEventLauncher"); + _ = Task.Run(() => this.OnDisconnectSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnDisconnectSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnDisconnectSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a Disconnect packet from the broker. /// - public event EventHandler OnDisconnectReceived = new((client, e) => { }); + public event EventHandler? OnDisconnectReceived; protected virtual void OnDisconnectReceivedEventLauncher(DisconnectPacket packet) { - var eventArgs = new OnDisconnectReceivedEventArgs(packet); - Logger.Trace("OnDisconnectReceivedEventLauncher: ReasonCode: " + packet.DisconnectReasonCode + " ReasonString: " + packet.Properties.ReasonString); - _ = Task.Run(() => this.OnDisconnectReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnDisconnectReceived != null && this.OnDisconnectReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnDisconnectReceivedEventArgs(packet); + Logger.Trace("OnDisconnectReceivedEventLauncher: ReasonCode: " + packet.DisconnectReasonCode + " ReasonString: " + packet.Properties.ReasonString); + _ = Task.Run(() => this.OnDisconnectReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnDisconnectReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnDisconnectReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client send a PingReq packet from the broker. /// - public event EventHandler OnPingReqSent = new((client, e) => { }); + public event EventHandler? OnPingReqSent; protected virtual void OnPingReqSentEventLauncher(PingReqPacket packet) { - var eventArgs = new OnPingReqSentEventArgs(packet); - Logger.Trace("OnPingReqSentEventLauncher"); - _ = Task.Run(() => this.OnPingReqSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPingReqSent != null && this.OnPingReqSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPingReqSentEventArgs(packet); + Logger.Trace("OnPingReqSentEventLauncher"); + _ = Task.Run(() => this.OnPingReqSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPingReqSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPingReqSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client send a PingReq packet from the broker. /// - public event EventHandler OnPingRespReceived = new((client, e) => { }); + public event EventHandler? OnPingRespReceived; protected virtual void OnPingRespReceivedEventLauncher(PingRespPacket packet) { - var eventArgs = new OnPingRespReceivedEventArgs(packet); - Logger.Trace("OnPingRespReceivedEventLauncher"); - _ = Task.Run(() => this.OnPingRespReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPingRespReceived != null && this.OnPingRespReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPingRespReceivedEventArgs(packet); + Logger.Trace("OnPingRespReceivedEventLauncher"); + _ = Task.Run(() => this.OnPingRespReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPingRespReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPingRespReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a Subscribe packet to the broker. /// - public event EventHandler OnSubscribeSent = new((client, e) => { }); + public event EventHandler? OnSubscribeSent; protected virtual void OnSubscribeSentEventLauncher(SubscribePacket packet) { - var eventArgs = new OnSubscribeSentEventArgs(packet); - Logger.Trace("OnSubscribeSentEventLauncher"); - _ = Task.Run(() => this.OnSubscribeSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnSubscribeSent != null && this.OnSubscribeSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnSubscribeSentEventArgs(packet); + Logger.Trace("OnSubscribeSentEventLauncher"); + _ = Task.Run(() => this.OnSubscribeSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnSubscribeSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnSubscribeSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a SubAck packet from the broker. /// - public event EventHandler OnSubAckReceived = new((client, e) => { }); + public event EventHandler? OnSubAckReceived; protected virtual void OnSubAckReceivedEventLauncher(SubAckPacket packet) { - var eventArgs = new OnSubAckReceivedEventArgs(packet); - Logger.Trace("OnSubAckReceivedEventLauncher"); - _ = Task.Run(() => this.OnSubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnSubAckReceived != null && this.OnSubAckReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnSubAckReceivedEventArgs(packet); + Logger.Trace("OnSubAckReceivedEventLauncher"); + _ = Task.Run(() => this.OnSubAckReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnSubAckReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnSubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a Unsubscribe packet to the broker. /// - public event EventHandler OnUnsubscribeSent = new((client, e) => { }); + public event EventHandler? OnUnsubscribeSent; protected virtual void OnUnsubscribeSentEventLauncher(UnsubscribePacket packet) { - var eventArgs = new OnUnsubscribeSentEventArgs(packet); - Logger.Trace("OnUnsubscribeSentEventLauncher"); - _ = Task.Run(() => this.OnUnsubscribeSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnUnsubscribeSent != null && this.OnUnsubscribeSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnUnsubscribeSentEventArgs(packet); + Logger.Trace("OnUnsubscribeSentEventLauncher"); + _ = Task.Run(() => this.OnUnsubscribeSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnUnsubscribeSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnUnsubscribeSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a UnsubAck packet from the broker. /// - public event EventHandler OnUnsubAckReceived = new((client, e) => { }); + public event EventHandler? OnUnsubAckReceived; protected virtual void OnUnsubAckReceivedEventLauncher(UnsubAckPacket packet) { - var eventArgs = new OnUnsubAckReceivedEventArgs(packet); - Logger.Trace("OnUnsubAckReceivedEventLauncher"); - _ = Task.Run(() => this.OnUnsubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnUnsubAckReceived != null && this.OnUnsubAckReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnUnsubAckReceivedEventArgs(packet); + Logger.Trace("OnUnsubAckReceivedEventLauncher"); + _ = Task.Run(() => this.OnUnsubAckReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnUnsubAckReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnUnsubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a Publish packet from the broker. /// - public event EventHandler OnPublishReceived = new((client, e) => { }); + public event EventHandler? OnPublishReceived; protected virtual void OnPublishReceivedEventLauncher(PublishPacket packet) { - var eventArgs = new OnPublishReceivedEventArgs(packet); - Logger.Trace("OnPublishReceivedEventLauncher"); - _ = Task.Run(() => this.OnPublishReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPublishReceived != null && this.OnPublishReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPublishReceivedEventArgs(packet); + Logger.Trace("OnPublishReceivedEventLauncher"); + _ = Task.Run(() => this.OnPublishReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPublishReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPublishReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a Publish packet to the broker. /// - public event EventHandler OnPublishSent = new((client, e) => { }); + public event EventHandler? OnPublishSent; protected virtual void OnPublishSentEventLauncher(PublishPacket packet) { - var eventArgs = new OnPublishSentEventArgs(packet); - Logger.Trace("OnPublishSentEventLauncher"); - _ = Task.Run(() => this.OnPublishSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPublishSent != null && this.OnPublishSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPublishSentEventArgs(packet); + Logger.Trace("OnPublishSentEventLauncher"); + _ = Task.Run(() => this.OnPublishSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPublishSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPublishSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a PubAck packet from the broker. /// - public event EventHandler OnPubAckReceived = new((client, e) => { }); + public event EventHandler? OnPubAckReceived; protected virtual void OnPubAckReceivedEventLauncher(PubAckPacket packet) { - var eventArgs = new OnPubAckReceivedEventArgs(packet); - Logger.Trace("OnPubAckReceivedEventLauncher"); - _ = Task.Run(() => this.OnPubAckReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubAckReceived != null && this.OnPubAckReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubAckReceivedEventArgs(packet); + Logger.Trace("OnPubAckReceivedEventLauncher"); + _ = Task.Run(() => this.OnPubAckReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubAckReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubAckReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a PubAck packet from the broker. /// - public event EventHandler OnPubAckSent = new((client, e) => { }); + public event EventHandler? OnPubAckSent; protected virtual void OnPubAckSentEventLauncher(PubAckPacket packet) { - var eventArgs = new OnPubAckSentEventArgs(packet); - Logger.Trace("OnPubAckSentEventLauncher"); - _ = Task.Run(() => this.OnPubAckSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubAckSent != null && this.OnPubAckSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubAckSentEventArgs(packet); + Logger.Trace("OnPubAckSentEventLauncher"); + _ = Task.Run(() => this.OnPubAckSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubAckSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubAckSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a PubRec packet from the broker. /// - public event EventHandler OnPubRecReceived = new((client, e) => { }); + public event EventHandler? OnPubRecReceived; protected virtual void OnPubRecReceivedEventLauncher(PubRecPacket packet) { - var eventArgs = new OnPubRecReceivedEventArgs(packet); - Logger.Trace("OnPubRecReceivedEventLauncher"); - _ = Task.Run(() => this.OnPubRecReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubRecReceived != null && this.OnPubRecReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubRecReceivedEventArgs(packet); + Logger.Trace("OnPubRecReceivedEventLauncher"); + _ = Task.Run(() => this.OnPubRecReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubRecReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubRecReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a PubRec packet to the broker. /// - public event EventHandler OnPubRecSent = new((client, e) => { }); + public event EventHandler? OnPubRecSent; protected virtual void OnPubRecSentEventLauncher(PubRecPacket packet) { - var eventArgs = new OnPubRecSentEventArgs(packet); - Logger.Trace("OnPubRecSentEventLauncher"); - _ = Task.Run(() => this.OnPubRecSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubRecSent != null && this.OnPubRecSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubRecSentEventArgs(packet); + Logger.Trace("OnPubRecSentEventLauncher"); + _ = Task.Run(() => this.OnPubRecSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubRecSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubRecSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client received a PubRel packet from the broker. /// - public event EventHandler OnPubRelReceived = new((client, e) => { }); + public event EventHandler? OnPubRelReceived; protected virtual void OnPubRelReceivedEventLauncher(PubRelPacket packet) { - var eventArgs = new OnPubRelReceivedEventArgs(packet); - Logger.Trace("OnPubRelReceivedEventLauncher"); - _ = Task.Run(() => this.OnPubRelReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubRelReceived != null && this.OnPubRelReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubRelReceivedEventArgs(packet); + Logger.Trace("OnPubRelReceivedEventLauncher"); + _ = Task.Run(() => this.OnPubRelReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubRelReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubRelReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sent a PubRel packet to the broker. /// - public event EventHandler OnPubRelSent = new((client, e) => { }); + public event EventHandler? OnPubRelSent; protected virtual void OnPubRelSentEventLauncher(PubRelPacket packet) { - var eventArgs = new OnPubRelSentEventArgs(packet); - Logger.Trace("OnPubRelSentEventLauncher"); - _ = Task.Run(() => this.OnPubRelSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubRelSent != null && this.OnPubRelSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubRelSentEventArgs(packet); + Logger.Trace("OnPubRelSentEventLauncher"); + _ = Task.Run(() => this.OnPubRelSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("OnPubRelSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("OnPubRelSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client receives a PubComp packet from the broker. /// - public event EventHandler OnPubCompReceived = new((client, e) => { }); + public event EventHandler? OnPubCompReceived; protected virtual void OnPubCompReceivedEventLauncher(PubCompPacket packet) { - var eventArgs = new OnPubCompReceivedEventArgs(packet); - Logger.Trace("PubCompReceivedEventLauncher"); - _ = Task.Run(() => this.OnPubCompReceived?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubCompReceived != null && this.OnPubCompReceived.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubCompReceivedEventArgs(packet); + Logger.Trace("PubCompReceivedEventLauncher"); + _ = Task.Run(() => this.OnPubCompReceived?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("PubCompReceivedEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("PubCompReceivedEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } /// /// Event that is fired after the client sends a PubComp packet to the broker. /// - public event EventHandler OnPubCompSent = new((client, e) => { }); + public event EventHandler? OnPubCompSent; protected virtual void OnPubCompSentEventLauncher(PubCompPacket packet) { - var eventArgs = new OnPubCompSentEventArgs(packet); - Logger.Trace("PubCompSentEventLauncher"); - _ = Task.Run(() => this.OnPubCompSent?.Invoke(this, eventArgs)).ContinueWith(t => - { - if (t.IsFaulted) + if (this.OnPubCompSent != null && this.OnPubCompSent.GetInvocationList().Length > 0) + { + var eventArgs = new OnPubCompSentEventArgs(packet); + Logger.Trace("PubCompSentEventLauncher"); + _ = Task.Run(() => this.OnPubCompSent?.Invoke(this, eventArgs)).ContinueWith( + t => { - Logger.Error("PubCompSentEventLauncher exception: " + t.Exception.Message); - } - }); + if (t.IsFaulted) + { + Logger.Error("PubCompSentEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } } diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 520c11ff..26d6516e 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -40,7 +40,7 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient internal ConcurrentQueue ReceivedQueue { get; } = new(); // Transactional packets indexed by packet identifier - internal readonly ConcurrentDictionary> transactionQueue = new(); + internal ConcurrentDictionary> TransactionQueue { get; } = new(); private readonly Stopwatch lastCommunicationTimer = new(); @@ -78,7 +78,7 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) = // Dumping Client State Logger.Trace($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}"); Logger.Trace($"{this.Options.ClientId}-(CM)- SendQueue:{this.SendQueue.Count} ReceivedQueue:{this.ReceivedQueue.Count} OutgoingPublishQueue:{this.OutgoingPublishQueue.Count}"); - Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:{this.transactionQueue.Count}"); + Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:{this.TransactionQueue.Count}"); Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionMonitor:{this.ConnectionMonitorTask?.Status}"); Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionPublishWriter:{this.ConnectionPublishWriterTask?.Status}"); Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionWriter:{this.ConnectionWriterTask?.Status}"); @@ -126,12 +126,11 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationTo } // Logger.Trace($"{this.Options.ClientId}-(PW)- {this.OutgoingPublishQueue.Count} publish packets waiting to be sent."); - var receiveMaximum = this.ConnectionProperties.ReceiveMaximum ?? 65535; - if (this.transactionQueue.Count >= receiveMaximum) + if (this.TransactionQueue.Count >= receiveMaximum) { Logger.Debug($"The Maximum number of publishes have been sent to broker. Waiting for existing transactions to complete."); - await Task.Delay(2000).ConfigureAwait(false); + await Task.Delay(10).ConfigureAwait(false); continue; } @@ -144,7 +143,7 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationTo publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { // QoS > 0 - Add to transaction queue - if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) + if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket })) { Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an outgoing QoS {publishPacket.Message.QoS} publish ."); continue; @@ -167,12 +166,6 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationTo } this.lastCommunicationTimer.Restart(); - - if (this.transactionQueue.Count >= (this.ConnectionProperties.ReceiveMaximum ?? 65535)) - { - break; - } - } else { @@ -210,7 +203,6 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => } // Logger.Trace($"{this.Options.ClientId}-(W)- {this.SendQueue.Count} packets waiting to be sent."); - if (this.SendQueue.TryDequeue(out var packet)) { FlushResult writeResult = default; @@ -348,7 +340,7 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => while (buffer.Length > 0) { - if (PacketDecoder.TryDecode(buffer, out var decodedPacket, out var consumed) == false) + if (!PacketDecoder.TryDecode(buffer, out var decodedPacket, out var consumed)) { if (decodedPacket is MalformedPacket) { @@ -385,7 +377,8 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => } else { - Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue."); + Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); + // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync this.ReceivedQueue.Enqueue(decodedPacket); } @@ -417,7 +410,6 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok } // Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.ReceivedQueue.Count} received packets currently waiting to be processed."); - if (this.ReceivedQueue.TryDequeue(out var packet)) { if (this.Options.ClientMaximumPacketSize != null) @@ -448,10 +440,6 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier} {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); Logger.Warn($"We shouldn't get the disconnect here - Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); throw new HiveMQttClientException("Received Disconnect packet in ReceivedPacketsHandlerAsync"); - // Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - // await this.HandleDisconnectionAsync(false).ConfigureAwait(false); - // this.OnDisconnectReceivedEventLauncher(disconnectPacket); - break; case PingRespPacket pingRespPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); this.OnPingRespReceivedEventLauncher(pingRespPacket); @@ -522,7 +510,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) var publishQoS2Chain = new List { publishPacket, pubRecResponse }; // FIXME: Wait for QoS 2 transaction to complete before calling OnMessageReceivedEventLauncher??? - if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain) == false) + if (!this.TransactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain)) { Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an incoming QoS {publishPacket.Message.QoS} publish ."); pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; @@ -545,7 +533,7 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) this.OnPubAckReceivedEventLauncher(pubAckPacket); // Remove the transaction chain from the transaction queue - if (this.transactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) + if (this.TransactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) { var publishPacket = (PublishPacket)publishQoS1Chain.First(); @@ -568,7 +556,7 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) this.OnPubRecReceivedEventLauncher(pubRecPacket); // Find the QoS2 transaction chain for this packet identifier - if (this.transactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var originalPublishQoS2Chain)) + if (this.TransactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var originalPublishQoS2Chain)) { var originalPublishPacket = (PublishPacket)originalPublishQoS2Chain.First(); @@ -584,7 +572,7 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) }; // Update the chain in the queue - if (this.transactionQueue.TryUpdate(pubRecPacket.PacketIdentifier, newPublishQoS2Chain, originalPublishQoS2Chain) == false) + if (!this.TransactionQueue.TryUpdate(pubRecPacket.PacketIdentifier, newPublishQoS2Chain, originalPublishQoS2Chain)) { Logger.Warn($"QoS2: Couldn't update PubRec --> PubRel QoS2 Chain for packet identifier {pubRecPacket.PacketIdentifier}."); } @@ -609,7 +597,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}"); this.OnPubRelReceivedEventLauncher(pubRelPacket); - if (this.transactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain)) + if (this.TransactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain)) { var originalPublishPacket = (PublishPacket)originalPublishQoS2Chain.First(); @@ -617,7 +605,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.Success); // This QoS2 transaction chain is done. Remove it from the transaction queue. - if (this.transactionQueue.TryRemove(pubRelPacket.PacketIdentifier, out var publishQoS2Chain)) + if (this.TransactionQueue.TryRemove(pubRelPacket.PacketIdentifier, out var publishQoS2Chain)) { // Update the chain with the latest packets for the event launcher publishQoS2Chain.Add(pubRelPacket); @@ -655,7 +643,7 @@ internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) this.OnPubCompReceivedEventLauncher(pubCompPacket); // Remove the QoS 2 transaction chain from the queue - if (this.transactionQueue.Remove(pubCompPacket.PacketIdentifier, out var publishQoS2Chain)) + if (this.TransactionQueue.Remove(pubCompPacket.PacketIdentifier, out var publishQoS2Chain)) { var originalPublishPacket = (PublishPacket)publishQoS2Chain.First(); diff --git a/Source/HiveMQtt/MQTT5/PacketDecoder.cs b/Source/HiveMQtt/MQTT5/PacketDecoder.cs index 127de369..7b6c63e1 100644 --- a/Source/HiveMQtt/MQTT5/PacketDecoder.cs +++ b/Source/HiveMQtt/MQTT5/PacketDecoder.cs @@ -80,7 +80,7 @@ public static bool TryDecode(ReadOnlySequence buffer, out ControlPacket de consumed = buffer.GetPosition(packetLength); decodedPacket = packet; - Logger.Trace($"PacketDecoder: Decoded Packet: consumed={consumed.GetInteger()}, packet={packet} id={packet.PacketIdentifier}"); + // Logger.Trace($"PacketDecoder: Decoded Packet: consumed={consumed.GetInteger()}, packet={packet} id={packet.PacketIdentifier}"); return true; } catch (Exception ex) diff --git a/Source/HiveMQtt/NLog.config b/Source/HiveMQtt/NLog.config index 9408b03f..35995150 100644 --- a/Source/HiveMQtt/NLog.config +++ b/Source/HiveMQtt/NLog.config @@ -34,6 +34,6 @@ Write all events with minimal level of Debug (So Debug, Info, Warn, Error and Fatal, but not Trace) to "logfile" --> - + diff --git a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs index 952cb00b..3605a60c 100644 --- a/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs +++ b/Tests/HiveMQtt.Test/HiveMQClient/PublishTest.cs @@ -177,7 +177,7 @@ public async Task PublishPayloadFormatIndicatorAsync() Assert.True(disconnectResult); } - [Fact (Skip = "Inconsistent on Github Actions")] + [Fact] public async Task ThreeNodeQoS0ChainedPublishesAsync() { var client1 = new HiveMQClient(); // publish message @@ -251,9 +251,9 @@ async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs even Assert.Empty(client2.SendQueue); Assert.Empty(client3.SendQueue); - Assert.Empty(client1.transactionQueue); - Assert.Empty(client2.transactionQueue); - Assert.Empty(client3.transactionQueue); + Assert.Empty(client1.TransactionQueue); + Assert.Empty(client2.TransactionQueue); + Assert.Empty(client3.TransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); @@ -264,7 +264,7 @@ async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs even Assert.True(disconnectResult); } - [Fact (Skip = "Inconsistent on Github Actions")] + [Fact] public async Task ThreeNodeQoS1ChainedPublishesAsync() { var client1 = new HiveMQClient(); // publish message @@ -340,9 +340,9 @@ async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs even Assert.Empty(client2.SendQueue); Assert.Empty(client3.SendQueue); - Assert.Empty(client1.transactionQueue); - Assert.Empty(client2.transactionQueue); - Assert.Empty(client3.transactionQueue); + Assert.Empty(client1.TransactionQueue); + Assert.Empty(client2.TransactionQueue); + Assert.Empty(client3.TransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false); @@ -426,9 +426,9 @@ async void Client3MessageHandler(object? sender, OnMessageReceivedEventArgs even Assert.Empty(client2.SendQueue); Assert.Empty(client3.SendQueue); - Assert.Empty(client1.transactionQueue); - Assert.Empty(client2.transactionQueue); - Assert.Empty(client3.transactionQueue); + Assert.Empty(client1.TransactionQueue); + Assert.Empty(client2.TransactionQueue); + Assert.Empty(client3.TransactionQueue); // All done, disconnect all clients var disconnectResult = await client1.DisconnectAsync().ConfigureAwait(false);