diff --git a/README.md b/README.md index b652379e..83d803ee 100644 --- a/README.md +++ b/README.md @@ -151,6 +151,42 @@ For more examples that you can easily copy/paste, see our [Examples](https://git There is even an https://github.com/hivemq/hivemq-mqtt-client-dotnet/tree/main/Examples/HiveMQtt-CLI to demonstrate usage of the package. +## Configuration + +### Logging + +The HiveMQtt package uses [NLog](https://github.com/NLog/NLog) and can be configured with a configuration file (`NLog.config`). Having this file in the same directory of your executable will configure the HiveMQtt logger to output as configured: + +```xml + + + + + + + + + + + + + + +``` + +Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package down to packet and event handling. Using this level will produce a lot of output such as the following: + +```log +2023-10-04 16:56:54.9373|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher +2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|7: TrafficInflowProcessor Starting...Connecting +2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|9: TrafficOutflowProcessor Starting...Connecting +2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|--> ConnectPacket +2023-10-04 16:56:55.0128|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher +2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|<-- ConnAck +2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher +2023-10-04 16:56:55.0379|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher +``` ## Other MQTT Clients diff --git a/Source/HiveMQtt/Client/Events/BeforeDisconnectEventArgs.cs b/Source/HiveMQtt/Client/Events/BeforeDisconnectEventArgs.cs index 06240826..b43c637b 100644 --- a/Source/HiveMQtt/Client/Events/BeforeDisconnectEventArgs.cs +++ b/Source/HiveMQtt/Client/Events/BeforeDisconnectEventArgs.cs @@ -15,12 +15,9 @@ */ namespace HiveMQtt.Client.Events; -using HiveMQtt.Client.Options; - /// /// Event arguments for the event. /// This event is called before a disconnect is sent to the broker. -/// contains the options of the disconnect operation. /// public class BeforeDisconnectEventArgs : EventArgs { @@ -28,5 +25,4 @@ public BeforeDisconnectEventArgs() { } - public HiveMQClientOptions Options { get; set; } } diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 767bf04b..9855980e 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -34,6 +34,8 @@ namespace HiveMQtt.Client; /// public partial class HiveMQClient : IDisposable, IHiveMQClient { + private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger(); + private ConnectState connectState = ConnectState.Disconnected; public HiveMQClient(HiveMQClientOptions? options = null) @@ -84,10 +86,10 @@ public async Task ConnectAsync() { connAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); } - catch (System.TimeoutException) + catch (TimeoutException) { - // log.Error(string.Format("Connect timeout. No response received in time.", ex); - throw; + this.connectState = ConnectState.Disconnected; + throw new HiveMQttClientException("Connect timeout. No response received in time."); } finally { @@ -148,7 +150,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) { disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); } - catch (System.TimeoutException) + catch (TimeoutException) { // Does it matter? We're disconnecting anyway. } @@ -161,6 +163,9 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) // Close the socket this.CloseSocket(); + // Fire the corresponding event + this.AfterDisconnectEventLauncher(true); + this.connectState = ConnectState.Disconnected; // Clear the send queue @@ -284,7 +289,7 @@ public async Task SubscribeAsync(SubscribeOptions options) // FIXME: Validate that the packet identifier matches } - catch (System.TimeoutException ex) + catch (TimeoutException ex) { // log.Error(string.Format("Connect timeout. No response received in time.", ex); throw ex; @@ -358,7 +363,7 @@ public async Task UnsubscribeAsync(List subscri // FIXME: Validate that the packet identifier matches } - catch (System.TimeoutException) + catch (TimeoutException) { // log.Error(string.Format("Connect timeout. No response received in time.", ex); throw; diff --git a/Source/HiveMQtt/Client/HiveMQClientEvents.cs b/Source/HiveMQtt/Client/HiveMQClientEvents.cs index 9c22cebc..ed7c5bc8 100644 --- a/Source/HiveMQtt/Client/HiveMQClientEvents.cs +++ b/Source/HiveMQtt/Client/HiveMQClientEvents.cs @@ -17,7 +17,6 @@ namespace HiveMQtt.Client; using System; using System.Diagnostics; -using System.Security.Claims; using HiveMQtt.Client.Events; using HiveMQtt.Client.Options; using HiveMQtt.Client.Results; @@ -39,7 +38,7 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient protected virtual void BeforeConnectEventLauncher(HiveMQClientOptions options) { var eventArgs = new BeforeConnectEventArgs(options); - Trace.WriteLine("BeforeConnectEventLauncher"); + Logger.Trace("BeforeConnectEventLauncher"); this.BeforeConnect?.Invoke(this, eventArgs); } @@ -51,7 +50,7 @@ protected virtual void BeforeConnectEventLauncher(HiveMQClientOptions options) protected virtual void AfterConnectEventLauncher(ConnectResult results) { var eventArgs = new AfterConnectEventArgs(results); - Trace.WriteLine("AfterConnectEventLauncher"); + Logger.Trace("AfterConnectEventLauncher"); this.AfterConnect?.Invoke(this, eventArgs); } @@ -63,7 +62,7 @@ protected virtual void AfterConnectEventLauncher(ConnectResult results) protected virtual void BeforeDisconnectEventLauncher() { var eventArgs = new BeforeDisconnectEventArgs(); - Trace.WriteLine("BeforeDisconnectEventLauncher"); + Logger.Trace("BeforeDisconnectEventLauncher"); this.BeforeDisconnect?.Invoke(this, eventArgs); } @@ -75,7 +74,7 @@ protected virtual void BeforeDisconnectEventLauncher() protected virtual void AfterDisconnectEventLauncher(bool clean = false) { var eventArgs = new AfterDisconnectEventArgs(clean); - Trace.WriteLine("AfterDisconnectEventLauncher"); + Logger.Trace("AfterDisconnectEventLauncher"); this.AfterDisconnect?.Invoke(this, eventArgs); } @@ -87,7 +86,7 @@ protected virtual void AfterDisconnectEventLauncher(bool clean = false) protected virtual void BeforeSubscribeEventLauncher(SubscribeOptions options) { var eventArgs = new BeforeSubscribeEventArgs(options); - Trace.WriteLine("BeforeSubscribeEventLauncher"); + Logger.Trace("BeforeSubscribeEventLauncher"); this.BeforeSubscribe?.Invoke(this, eventArgs); } @@ -99,7 +98,7 @@ protected virtual void BeforeSubscribeEventLauncher(SubscribeOptions options) protected virtual void AfterSubscribeEventLauncher(SubscribeResult results) { var eventArgs = new AfterSubscribeEventArgs(results); - Trace.WriteLine("AfterSubscribeEventLauncher"); + Logger.Trace("AfterSubscribeEventLauncher"); this.AfterSubscribe?.Invoke(this, eventArgs); } @@ -111,7 +110,7 @@ protected virtual void AfterSubscribeEventLauncher(SubscribeResult results) protected virtual void BeforeUnsubscribeEventLauncher(List subscriptions) { var eventArgs = new BeforeUnsubscribeEventArgs(subscriptions); - Trace.WriteLine("BeforeUnsubscribeEventLauncher"); + Logger.Trace("BeforeUnsubscribeEventLauncher"); this.BeforeUnsubscribe?.Invoke(this, eventArgs); } @@ -123,7 +122,7 @@ protected virtual void BeforeUnsubscribeEventLauncher(List subscri protected virtual void AfterUnsubscribeEventLauncher(UnsubscribeResult results) { var eventArgs = new AfterUnsubscribeEventArgs(results); - Trace.WriteLine("AfterUnsubscribeEventLauncher"); + Logger.Trace("AfterUnsubscribeEventLauncher"); this.AfterUnsubscribe?.Invoke(this, eventArgs); } @@ -135,7 +134,7 @@ protected virtual void AfterUnsubscribeEventLauncher(UnsubscribeResult results) protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) { var eventArgs = new OnMessageReceivedEventArgs(packet.Message); - Trace.WriteLine("OnMessageReceivedEventLauncher"); + Logger.Trace("OnMessageReceivedEventLauncher"); this.OnMessageReceived?.Invoke(this, eventArgs); } @@ -151,7 +150,7 @@ protected virtual void OnMessageReceivedEventLauncher(PublishPacket packet) protected virtual void OnConnectSentEventLauncher(ConnectPacket packet) { var eventArgs = new OnConnectSentEventArgs(packet); - Trace.WriteLine("OnConnectSentEventLauncher"); + Logger.Trace("OnConnectSentEventLauncher"); this.OnConnectSent?.Invoke(this, eventArgs); } @@ -163,7 +162,7 @@ protected virtual void OnConnectSentEventLauncher(ConnectPacket packet) protected virtual void OnConnAckReceivedEventLauncher(ConnAckPacket packet) { var eventArgs = new OnConnAckReceivedEventArgs(packet); - Trace.WriteLine("OnConnAckReceivedEventLauncher"); + Logger.Trace("OnConnAckReceivedEventLauncher"); this.OnConnAckReceived?.Invoke(this, eventArgs); } @@ -175,7 +174,7 @@ protected virtual void OnConnAckReceivedEventLauncher(ConnAckPacket packet) protected virtual void OnDisconnectSentEventLauncher(DisconnectPacket packet) { var eventArgs = new OnDisconnectSentEventArgs(packet); - Trace.WriteLine("OnDisconnectSentEventLauncher"); + Logger.Trace("OnDisconnectSentEventLauncher"); this.OnDisconnectSent?.Invoke(this, eventArgs); } @@ -187,7 +186,7 @@ protected virtual void OnDisconnectSentEventLauncher(DisconnectPacket packet) protected virtual void OnDisconnectReceivedEventLauncher(DisconnectPacket packet) { var eventArgs = new OnDisconnectReceivedEventArgs(packet); - Trace.WriteLine("OnDisconnectReceivedEventLauncher: ReasonCode: " + packet.DisconnectReasonCode + " ReasonString: " + packet.Properties.ReasonString); + Logger.Trace("OnDisconnectReceivedEventLauncher: ReasonCode: " + packet.DisconnectReasonCode + " ReasonString: " + packet.Properties.ReasonString); this.OnDisconnectReceived?.Invoke(this, eventArgs); } @@ -199,7 +198,7 @@ protected virtual void OnDisconnectReceivedEventLauncher(DisconnectPacket packet protected virtual void OnPingReqSentEventLauncher(PingReqPacket packet) { var eventArgs = new OnPingReqSentEventArgs(packet); - Trace.WriteLine("OnPingReqSentEventLauncher"); + Logger.Trace("OnPingReqSentEventLauncher"); this.OnPingReqSent?.Invoke(this, eventArgs); } @@ -211,7 +210,7 @@ protected virtual void OnPingReqSentEventLauncher(PingReqPacket packet) protected virtual void OnPingRespReceivedEventLauncher(PingRespPacket packet) { var eventArgs = new OnPingRespReceivedEventArgs(packet); - Trace.WriteLine("OnPingRespReceivedEventLauncher"); + Logger.Trace("OnPingRespReceivedEventLauncher"); this.OnPingRespReceived?.Invoke(this, eventArgs); } @@ -223,7 +222,7 @@ protected virtual void OnPingRespReceivedEventLauncher(PingRespPacket packet) protected virtual void OnSubscribeSentEventLauncher(SubscribePacket packet) { var eventArgs = new OnSubscribeSentEventArgs(packet); - Trace.WriteLine("OnSubscribeSentEventLauncher"); + Logger.Trace("OnSubscribeSentEventLauncher"); this.OnSubscribeSent?.Invoke(this, eventArgs); } @@ -235,7 +234,7 @@ protected virtual void OnSubscribeSentEventLauncher(SubscribePacket packet) protected virtual void OnSubAckReceivedEventLauncher(SubAckPacket packet) { var eventArgs = new OnSubAckReceivedEventArgs(packet); - Trace.WriteLine("OnSubAckReceivedEventLauncher"); + Logger.Trace("OnSubAckReceivedEventLauncher"); this.OnSubAckReceived?.Invoke(this, eventArgs); } @@ -247,7 +246,7 @@ protected virtual void OnSubAckReceivedEventLauncher(SubAckPacket packet) protected virtual void OnUnsubscribeSentEventLauncher(UnsubscribePacket packet) { var eventArgs = new OnUnsubscribeSentEventArgs(packet); - Trace.WriteLine("OnUnsubscribeSentEventLauncher"); + Logger.Trace("OnUnsubscribeSentEventLauncher"); this.OnUnsubscribeSent?.Invoke(this, eventArgs); } @@ -259,7 +258,7 @@ protected virtual void OnUnsubscribeSentEventLauncher(UnsubscribePacket packet) protected virtual void OnUnsubAckReceivedEventLauncher(UnsubAckPacket packet) { var eventArgs = new OnUnsubAckReceivedEventArgs(packet); - Trace.WriteLine("OnUnsubAckReceivedEventLauncher"); + Logger.Trace("OnUnsubAckReceivedEventLauncher"); this.OnUnsubAckReceived?.Invoke(this, eventArgs); } @@ -271,7 +270,7 @@ protected virtual void OnUnsubAckReceivedEventLauncher(UnsubAckPacket packet) protected virtual void OnPublishReceivedEventLauncher(PublishPacket packet) { var eventArgs = new OnPublishReceivedEventArgs(packet); - Trace.WriteLine("OnPublishReceivedEventLauncher"); + Logger.Trace("OnPublishReceivedEventLauncher"); this.OnPublishReceived?.Invoke(this, eventArgs); } @@ -283,7 +282,7 @@ protected virtual void OnPublishReceivedEventLauncher(PublishPacket packet) protected virtual void OnPublishSentEventLauncher(PublishPacket packet) { var eventArgs = new OnPublishSentEventArgs(packet); - Trace.WriteLine("OnPublishSentEventLauncher"); + Logger.Trace("OnPublishSentEventLauncher"); this.OnPublishSent?.Invoke(this, eventArgs); } @@ -295,7 +294,7 @@ protected virtual void OnPublishSentEventLauncher(PublishPacket packet) protected virtual void OnPubAckReceivedEventLauncher(PubAckPacket packet) { var eventArgs = new OnPubAckReceivedEventArgs(packet); - Trace.WriteLine("OnPubAckReceivedEventLauncher"); + Logger.Trace("OnPubAckReceivedEventLauncher"); this.OnPubAckReceived?.Invoke(this, eventArgs); } @@ -307,7 +306,7 @@ protected virtual void OnPubAckReceivedEventLauncher(PubAckPacket packet) protected virtual void OnPubAckSentEventLauncher(PubAckPacket packet) { var eventArgs = new OnPubAckSentEventArgs(packet); - Trace.WriteLine("OnPubAckSentEventLauncher"); + Logger.Trace("OnPubAckSentEventLauncher"); this.OnPubAckSent?.Invoke(this, eventArgs); } @@ -319,7 +318,7 @@ protected virtual void OnPubAckSentEventLauncher(PubAckPacket packet) protected virtual void OnPubRecReceivedEventLauncher(PubRecPacket packet) { var eventArgs = new OnPubRecReceivedEventArgs(packet); - Trace.WriteLine("OnPubRecReceivedEventLauncher"); + Logger.Trace("OnPubRecReceivedEventLauncher"); this.OnPubRecReceived?.Invoke(this, eventArgs); } @@ -331,7 +330,7 @@ protected virtual void OnPubRecReceivedEventLauncher(PubRecPacket packet) protected virtual void OnPubRecSentEventLauncher(PubRecPacket packet) { var eventArgs = new OnPubRecSentEventArgs(packet); - Trace.WriteLine("OnPubRecSentEventLauncher"); + Logger.Trace("OnPubRecSentEventLauncher"); this.OnPubRecSent?.Invoke(this, eventArgs); } @@ -343,7 +342,7 @@ protected virtual void OnPubRecSentEventLauncher(PubRecPacket packet) protected virtual void OnPubRelReceivedEventLauncher(PubRelPacket packet) { var eventArgs = new OnPubRelReceivedEventArgs(packet); - Trace.WriteLine("OnPubRelReceivedEventLauncher"); + Logger.Trace("OnPubRelReceivedEventLauncher"); this.OnPubRelReceived?.Invoke(this, eventArgs); } @@ -355,7 +354,7 @@ protected virtual void OnPubRelReceivedEventLauncher(PubRelPacket packet) protected virtual void OnPubRelSentEventLauncher(PubRelPacket packet) { var eventArgs = new OnPubRelSentEventArgs(packet); - Trace.WriteLine("OnPubRelSentEventLauncher"); + Logger.Trace("OnPubRelSentEventLauncher"); this.OnPubRelSent?.Invoke(this, eventArgs); } @@ -367,7 +366,7 @@ protected virtual void OnPubRelSentEventLauncher(PubRelPacket packet) protected virtual void OnPubCompReceivedEventLauncher(PubCompPacket packet) { var eventArgs = new OnPubCompReceivedEventArgs(packet); - Trace.WriteLine("PubCompReceivedEventLauncher"); + Logger.Trace("PubCompReceivedEventLauncher"); this.OnPubCompReceived?.Invoke(this, eventArgs); } @@ -379,7 +378,7 @@ protected virtual void OnPubCompReceivedEventLauncher(PubCompPacket packet) protected virtual void OnPubCompSentEventLauncher(PubCompPacket packet) { var eventArgs = new OnPubCompSentEventArgs(packet); - Trace.WriteLine("PubCompSentEventLauncher"); + Logger.Trace("PubCompSentEventLauncher"); this.OnPubCompSent?.Invoke(this, eventArgs); } } diff --git a/Source/HiveMQtt/Client/HiveMQClientLogger.cs b/Source/HiveMQtt/Client/HiveMQClientLogger.cs deleted file mode 100644 index 3f5ab853..00000000 --- a/Source/HiveMQtt/Client/HiveMQClientLogger.cs +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright 2022-present HiveMQ and the HiveMQ Community - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -namespace HiveMQtt.Client; - -using Microsoft.Extensions.Logging; - -/// -public partial class HiveMQClient : IDisposable, IHiveMQClient -{ - private ILogger _logger; - - public void AttachLogger(ILogger logger) => this._logger = logger; -} diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs index 7fd49ebb..d3ec55df 100644 --- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs +++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs @@ -101,7 +101,15 @@ internal async Task ConnectSocketAsync() IPEndPoint ipEndPoint = new(ipAddress, this.Options.Port); this.socket = new(ipEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - await this.socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + + try + { + await this.socket.ConnectAsync(ipEndPoint).ConfigureAwait(false); + } + catch (SocketException socketException) + { + throw new HiveMQttClientException("Failed to connect to broker", socketException); + } var socketConnected = this.socket.Connected; @@ -131,7 +139,7 @@ internal async Task ConnectSocketAsync() _ = this.TrafficOutflowProcessorAsync(this.outFlowCancellationToken); _ = this.TrafficInflowProcessorAsync(this.infoFlowCancellationToken); - // Console.WriteLine($"Socket connected to {this.socket.RemoteEndPoint}"); + Logger.Trace($"Socket connected to {this.socket.RemoteEndPoint}"); return socketConnected; } diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index c95af7a3..c985db44 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -32,13 +32,14 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient // The outgoing packet queue. Packets queued to be sent. private readonly ConcurrentQueue sendQueue = new(); - // Transactional packets indexed by packet identifer + // Transactional packets indexed by packet identifier private readonly ConcurrentDictionary> transactionQueue = new(); /// /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue. /// - private Task TrafficOutflowProcessorAsync(CancellationToken cancellationToken) => Task.Run(async () => + private Task TrafficOutflowProcessorAsync(CancellationToken cancellationToken) => Task.Run( + async () => { var stopWatch = new Stopwatch(); var keepAlivePeriod = this.Options.KeepAlive / 2; @@ -46,7 +47,7 @@ private Task TrafficOutflowProcessorAsync(CancellationToken cancellationTo stopWatch.Start(); - Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Starting...{this.connectState}"); + Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Starting...{this.connectState}"); while (this.connectState != ConnectState.Disconnected) { @@ -55,6 +56,7 @@ private Task TrafficOutflowProcessorAsync(CancellationToken cancellationTo if (elapsed > TimeSpan.FromSeconds(keepAlivePeriod)) { // Send PingReq + Logger.Trace("--> PingReq"); var writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); this.OnPingReqSentEventLauncher(new PingReqPacket()); stopWatch.Restart(); @@ -76,32 +78,32 @@ private Task TrafficOutflowProcessorAsync(CancellationToken cancellationTo if (this.sendQueue.TryDequeue(out var packet)) { FlushResult writeResult; - // FIXME: Handle writeResult.IsCanceled and writeResult.IsCompleted + switch (packet) { // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time. case ConnectPacket connectPacket: - Trace.WriteLine("--> ConnectPacket"); + Logger.Trace("--> ConnectPacket"); writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false); this.OnConnectSentEventLauncher(connectPacket); break; case DisconnectPacket disconnectPacket: - Trace.WriteLine("--> DisconnectPacket"); + Logger.Trace("--> DisconnectPacket"); writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false); this.OnDisconnectSentEventLauncher(disconnectPacket); break; case SubscribePacket subscribePacket: - Trace.WriteLine("--> SubscribePacket"); + Logger.Trace("--> SubscribePacket"); writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false); this.OnSubscribeSentEventLauncher(subscribePacket); break; case UnsubscribePacket unsubscribePacket: - Trace.WriteLine("--> UnsubscribePacket"); + Logger.Trace("--> UnsubscribePacket"); writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); this.OnUnsubscribeSentEventLauncher(unsubscribePacket); break; case PublishPacket publishPacket: - Trace.WriteLine("--> PublishPacket"); + Logger.Trace("--> PublishPacket"); if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery || publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) { @@ -119,28 +121,28 @@ private Task TrafficOutflowProcessorAsync(CancellationToken cancellationTo case PubAckPacket pubAckPacket: // This is in response to a received Publish packet. Communication chain management // was done in the receiver code. Just send the response. - Trace.WriteLine("--> PubAckPacket"); + Logger.Trace("--> PubAckPacket"); writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); this.OnPubAckSentEventLauncher(pubAckPacket); break; case PubRecPacket pubRecPacket: // This is in response to a received Publish packet. Communication chain management // was done in the receiver code. Just send the response. - Trace.WriteLine("--> PubRecPacket"); + Logger.Trace("--> PubRecPacket"); writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); this.OnPubRecSentEventLauncher(pubRecPacket); break; case PubRelPacket pubRelPacket: // This is in response to a received PubRec packet. Communication chain management // was done in the receiver code. Just send the response. - Trace.WriteLine("--> PubRelPacket"); + Logger.Trace("--> PubRelPacket"); writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false); this.OnPubRelSentEventLauncher(pubRelPacket); break; case PubCompPacket pubCompPacket: // This is in response to a received PubRel packet. Communication chain management // was done in the receiver code. Just send the response. - Trace.WriteLine("--> PubCompPacket"); + Logger.Trace("--> PubCompPacket"); writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); this.OnPubCompSentEventLauncher(pubCompPacket); break; @@ -152,46 +154,78 @@ private Task TrafficOutflowProcessorAsync(CancellationToken cancellationTo */ default: - Trace.WriteLine("--> Unknown packet type"); + Logger.Trace("--> Unknown packet type"); throw new NotImplementedException(); } + if (writeResult.IsCanceled) + { + Logger.Trace("TrafficOutflowProcessor Write Canceled"); + break; + } + + if (writeResult.IsCompleted) + { + Logger.Trace("TrafficOutflowProcessor IsCompleted: end of the stream"); + break; + } + stopWatch.Restart(); } } // while - Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}"); + Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}"); return true; }, cancellationToken); /// /// Asynchronous background task that handles the incoming traffic of packets. /// - private Task TrafficInflowProcessorAsync(CancellationToken cancellationToken) => Task.Run(async () => + private Task TrafficInflowProcessorAsync(CancellationToken cancellationToken) => Task.Run( + async () => { - Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Starting...{this.connectState}"); + Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Starting...{this.connectState}"); ReadResult readResult; while (this.connectState is ConnectState.Connecting or ConnectState.Connected) { + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace("TrafficInflowProcessor Canceled"); + break; + } + readResult = await this.ReadAsync().ConfigureAwait(false); if (readResult.IsCanceled) { - Trace.WriteLine("TrafficInflowProcessor Read Canceled"); + Logger.Trace("TrafficInflowProcessor Read Canceled"); break; } if (readResult.IsCompleted) { - Trace.WriteLine("TrafficInflowProcessor IsCompleted: end of the stream"); - break; + // This is an unexpected exit and may be due to a network failure. + Logger.Trace("TrafficInflowProcessor IsCompleted: end of the streamx"); + + if (this.connectState == ConnectState.Connected) + { + Logger.Trace("TrafficInflowProcessor IsCompleted: was unexpected"); + this.connectState = ConnectState.Disconnected; + + // Launch the AfterDisconnect event with a clean disconnect set to false. + this.AfterDisconnectEventLauncher(false); + + this.cancellationSource.Cancel(); + return false; + } + return true; } if (readResult.Buffer.IsEmpty) { - Trace.WriteLine("TrafficInflowProcessor Read Buffer Empty"); + Logger.Trace("TrafficInflowProcessor Read Buffer Empty"); continue; } @@ -204,7 +238,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok } catch (Exception ex) { - Trace.WriteLine($"TrafficInflowProcessor Decoding Exception: {ex.Message}"); + Logger.Trace($"TrafficInflowProcessor Decoding Exception: {ex.Message}"); throw; } @@ -214,7 +248,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok } else if (packet is MalformedPacket) { - Trace.WriteLine("TrafficInflowProcessor Malformed Packet Detected !!! Skipping..."); + Logger.Trace("TrafficInflowProcessor Malformed Packet Detected !!! Skipping..."); this.reader?.AdvanceTo(consumed); continue; } @@ -225,153 +259,184 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok switch (packet) { case ConnAckPacket connAckPacket: - Trace.WriteLine("<-- ConnAck"); + Logger.Trace("<-- ConnAck"); this.OnConnAckReceivedEventLauncher(connAckPacket); break; case DisconnectPacket disconnectPacket: - Trace.WriteLine("<-- Disconnect"); + Logger.Trace("<-- Disconnect"); this.OnDisconnectReceivedEventLauncher(disconnectPacket); break; case PingRespPacket pingRespPacket: - Trace.WriteLine("<-- PingResp"); + Logger.Trace("<-- PingResp"); this.OnPingRespReceivedEventLauncher(pingRespPacket); break; case SubAckPacket subAckPacket: - Trace.WriteLine("<-- SubAck"); + Logger.Trace("<-- SubAck"); this.OnSubAckReceivedEventLauncher(subAckPacket); break; case UnsubAckPacket unsubAckPacket: - Trace.WriteLine("<-- UnsubAck"); + Logger.Trace("<-- UnsubAck"); this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); break; case PublishPacket publishPacket: - Trace.WriteLine("<-- Publish"); - if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery) - { - // We've received a QoS 1 publish. Send a PubAck. - var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); - this.sendQueue.Enqueue(pubAckResponse); - } - else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) - { - // We've received a QoS 2 publish. Send a PubRec and add to transaction list. - var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); - var transaction = new List { publishPacket, pubRecResponse }; - - if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) - { - // FIXME: Log, trace to assist debugging - pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; - } - - this.sendQueue.Enqueue(pubRecResponse); - } - - this.OnMessageReceivedEventLauncher(publishPacket); + this.HandleIncomingPublishPacket(publishPacket); break; case PubAckPacket pubAckPacket: - Trace.WriteLine("<-- PubAck"); - if (this.transactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) - { - var publishPacket = (PublishPacket)publishQoS1Chain.First(); - - // Trigger the packet specific event - publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckPacket); - } - else - { - throw new HiveMQttClientException("Received PubAck with an unknown packet identifier: ¯\\_(ツ)_/¯"); - } - - this.OnPubAckReceivedEventLauncher(pubAckPacket); - + this.HandleIncomingPubAckPacket(pubAckPacket); break; case PubRecPacket pubRecPacket: - Trace.WriteLine("<-- PubRec"); - this.OnPubRecReceivedEventLauncher(pubRecPacket); - PubRelPacket pubRelResponsePacket; - if (this.transactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var publishQoS2Chain)) - { - var publishPacket = (PublishPacket)publishQoS2Chain.First(); - - // Trigger the packet specific event - publishPacket.OnPublishQoS2CompleteEventLauncher(pubRecPacket); - - // Add the PUBREC to the chain - publishQoS2Chain.Add(pubRecPacket); - - // Send and add a PUBREL to the chain - pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.Success); - this.sendQueue.Enqueue(pubRelResponsePacket); - publishQoS2Chain.Add(pubRelResponsePacket); - } - else - { - // Send a PUBREL with PacketIdentifierNotFound - pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound); - this.sendQueue.Enqueue(pubRelResponsePacket); - } - + this.HandleIncomingPubRecPacket(pubRecPacket); break; case PubRelPacket pubRelPacket: - Trace.WriteLine("<-- PubRel"); - PubCompPacket pubCompResponsePacket; - if (this.transactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var pubRelQoS2Chain)) - { - // Send a PUBCOMP - pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.Success); - } - else - { - // Send a PUBCOMP with PacketIdentifierNotFound - pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound); - } - - this.sendQueue.Enqueue(pubCompResponsePacket); - this.OnPubRelReceivedEventLauncher(pubRelPacket); + this.HandleIncomingPubRelPacket(pubRelPacket); break; case PubCompPacket pubCompPacket: - Trace.WriteLine("<-- PubComp"); - if (!this.transactionQueue.Remove(pubCompPacket.PacketIdentifier, out var pubcompQoS2Chain)) - { - throw new HiveMQttClientException("Received PubComp with an unknown packet identifier: ¯\\_(ツ)_/¯"); - } - - this.OnPubCompReceivedEventLauncher(pubCompPacket); - + this.HandleIncomingPubCompPacket(pubCompPacket); break; default: - Trace.WriteLine("<-- Unknown"); + Logger.Trace("<-- Unknown"); Console.WriteLine($"Unknown packet received: {packet}"); break; } // switch (packet) } // while - Trace.WriteLine($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Exiting...{this.connectState}"); + Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Exiting...{this.connectState}"); + + return true; + }, cancellationToken); - // Launch the AfterDisconnect event only in certain cases - switch (this.connectState) + /// + /// Handle an incoming Publish packet. + /// + /// + internal void HandleIncomingPublishPacket(PublishPacket publishPacket) + { + Logger.Trace("<-- Publish"); + if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery) + { + // We've received a QoS 1 publish. Send a PubAck. + var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success); + this.sendQueue.Enqueue(pubAckResponse); + } + else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery) + { + // We've received a QoS 2 publish. Send a PubRec and add to transaction list. + var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); + var transaction = new List { publishPacket, pubRecResponse }; + + if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false) { - case ConnectState.Disconnecting: - case ConnectState.Disconnected: - // This disconnect was either user or broker initiated. - // Launch the AfterDisconnect event with a clean disconnect set to true. - this.AfterDisconnectEventLauncher(true); - break; - case ConnectState.Connected: - // This is an unexpected exit and may be due to a failure. - // Launch the AfterDisconnect event with a clean disconnect set to false. - this.AfterDisconnectEventLauncher(false); - break; - case ConnectState.Connecting: - break; - default: - break; + // FIXME: Log, trace to assist debugging + pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse; } - return true; - }, cancellationToken); + this.sendQueue.Enqueue(pubRecResponse); + } + + this.OnMessageReceivedEventLauncher(publishPacket); + } + + /// + /// Handle an incoming ConnAck packet. + /// + /// + /// + internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket) + { + Logger.Trace("<-- PubAck"); + if (this.transactionQueue.Remove(pubAckPacket.PacketIdentifier, out var publishQoS1Chain)) + { + var publishPacket = (PublishPacket)publishQoS1Chain.First(); + + // Trigger the packet specific event + publishPacket.OnPublishQoS1CompleteEventLauncher(pubAckPacket); + } + else + { + throw new HiveMQttClientException("Received PubAck with an unknown packet identifier: ¯\\_(ツ)_/¯"); + } + + this.OnPubAckReceivedEventLauncher(pubAckPacket); + } + /// + /// Handle an incoming PubRec packet. + /// + /// + internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket) + { + Logger.Trace("<-- PubRec"); + PubRelPacket pubRelResponsePacket; + if (this.transactionQueue.TryGetValue(pubRecPacket.PacketIdentifier, out var publishQoS2Chain)) + { + var publishPacket = (PublishPacket)publishQoS2Chain.First(); + + // Trigger the packet specific event + publishPacket.OnPublishQoS2CompleteEventLauncher(pubRecPacket); + + // Add the PUBREC to the chain + publishQoS2Chain.Add(pubRecPacket); + + // Send and add a PUBREL to the chain + pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.Success); + this.sendQueue.Enqueue(pubRelResponsePacket); + publishQoS2Chain.Add(pubRelResponsePacket); + } + else + { + pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound); + this.sendQueue.Enqueue(pubRelResponsePacket); + } + + this.OnPubRecReceivedEventLauncher(pubRecPacket); + } + + /// + /// Handle an incoming PubRel packet. + /// + /// + internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket) + { + Logger.Trace("<-- PubRel"); + PubCompPacket pubCompResponsePacket; + if (this.transactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var pubRelQoS2Chain)) + { + // Send a PUBCOMP + pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.Success); + } + else + { + // Send a PUBCOMP with PacketIdentifierNotFound + pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound); + } + + this.sendQueue.Enqueue(pubCompResponsePacket); + this.OnPubRelReceivedEventLauncher(pubRelPacket); + } + + /// + /// Handle an incoming PubComp packet. + /// + /// + /// + internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket) + { + Logger.Trace("<-- PubComp"); + if (!this.transactionQueue.Remove(pubCompPacket.PacketIdentifier, out var pubcompQoS2Chain)) + { + throw new HiveMQttClientException("Received PubComp with an unknown packet identifier: ¯\\_(ツ)_/¯"); + } + + this.OnPubCompReceivedEventLauncher(pubCompPacket); + } + + /// + /// Write a buffer to the stream. + /// + /// + /// + /// + /// internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken = default) { if (this.writer is null) @@ -382,6 +447,12 @@ internal ValueTask WriteAsync(ReadOnlyMemory source, Cancella return this.writer.WriteAsync(source, cancellationToken); } + /// + /// Read a buffer from the stream. + /// + /// + /// + /// internal ValueTask ReadAsync(CancellationToken cancellationToken = default) { if (this.reader is null) diff --git a/Source/HiveMQtt/MQTT5/ControlPacket.cs b/Source/HiveMQtt/MQTT5/ControlPacket.cs index 2f627eb3..7a80156e 100644 --- a/Source/HiveMQtt/MQTT5/ControlPacket.cs +++ b/Source/HiveMQtt/MQTT5/ControlPacket.cs @@ -27,6 +27,8 @@ namespace HiveMQtt.MQTT5; /// public abstract class ControlPacket { + internal static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger(); + public ControlPacket() => this.Properties = new MQTT5Properties(); /// diff --git a/Source/HiveMQtt/MQTT5/PacketDecoder.cs b/Source/HiveMQtt/MQTT5/PacketDecoder.cs index 35cbc137..0d473068 100644 --- a/Source/HiveMQtt/MQTT5/PacketDecoder.cs +++ b/Source/HiveMQtt/MQTT5/PacketDecoder.cs @@ -16,7 +16,6 @@ namespace HiveMQtt.MQTT5; using System.Buffers; -using System.Diagnostics; using HiveMQtt.MQTT5.Packets; /// @@ -24,6 +23,8 @@ namespace HiveMQtt.MQTT5; /// internal class PacketDecoder { + private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger(); + public static ControlPacket Decode(ReadOnlySequence buffer, out SequencePosition consumed) { try @@ -75,9 +76,9 @@ public static ControlPacket Decode(ReadOnlySequence buffer, out SequencePo consumed = buffer.GetPosition(packetLength); return packet; } - catch (System.Exception) + catch (Exception) { - Trace.WriteLine("PacketDecoder.Decode: Exception caught. Returning MalformedPacket."); + Logger.Trace("PacketDecoder.Decode: Exception caught. Returning MalformedPacket."); consumed = buffer.Start; return new MalformedPacket(buffer); } diff --git a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs index 0dd0a0e0..6cf202e6 100644 --- a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs @@ -16,7 +16,6 @@ namespace HiveMQtt.MQTT5.Packets; using System.Buffers; -using System.Diagnostics; using System.IO; using HiveMQtt.Client.Events; using HiveMQtt.MQTT5.Types; @@ -68,7 +67,7 @@ public PublishPacket(ReadOnlySequence packetData) internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet) { var eventArgs = new OnPublishQoS1CompleteEventArgs(packet); - Trace.WriteLine("OnPublishQoS1CompleteEventLauncher"); + Logger.Trace("OnPublishQoS1CompleteEventLauncher"); this.OnPublishQoS1Complete?.Invoke(this, eventArgs); } @@ -80,7 +79,7 @@ internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet) internal virtual void OnPublishQoS2CompleteEventLauncher(PubRecPacket packet) { var eventArgs = new OnPublishQoS2CompleteEventArgs(packet); - Trace.WriteLine("OnPublishQoS2CompleteEventLauncher"); + Logger.Trace("OnPublishQoS2CompleteEventLauncher"); this.OnPublishQoS2Complete?.Invoke(this, eventArgs); } diff --git a/Source/HiveMQtt/NLog.config b/Source/HiveMQtt/NLog.config index 230d57e4..9408b03f 100644 --- a/Source/HiveMQtt/NLog.config +++ b/Source/HiveMQtt/NLog.config @@ -1,10 +1,6 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + - - - +