diff --git a/.vscode/launch.json b/.vscode/launch.json index e0a489c3..cbdd6229 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -1,27 +1,37 @@ { "version": "0.2.0", "configurations": [ - - { - // Use IntelliSense to find out which attributes exist for C# debugging - // Use hover for the description of the existing attributes - // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md - "name": ".NET Core Launch (console)", - "type": "coreclr", - "request": "launch", - "preLaunchTask": "build", - // If you have changed target frameworks, make sure to update the program path. - "program": "${workspaceFolder}/Tests/HiveMQtt.Test/bin/Debug/net8.0/HiveMQtt.Test.dll", - "args": [], - "cwd": "${workspaceFolder}/Tests/HiveMQtt.Test", - // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console - "console": "integratedTerminal", - "stopAtEntry": false - }, - { - "name": ".NET Core Attach", - "type": "coreclr", - "request": "attach" - } + { + "name": "SendMessageOnLoop .NET Core Launch (console)", + "type": "coreclr", + "request": "launch", + "preLaunchTask": "build SendMessageOnLoop", + "program": "${workspaceFolder}/Examples/SendMessageOnLoop/bin/Debug/net8.0/SendMessageOnLoop.dll", + "args": [], + "cwd": "${workspaceFolder}", + "stopAtEntry": false, + "console": "internalConsole" + }, + { + "name": ".NET Core Attach", + "type": "coreclr", + "request": "attach" + }, + { + // Use IntelliSense to find out which attributes exist for C# debugging + // Use hover for the description of the existing attributes + // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md + "name": ".NET Core Launch (console)", + "type": "coreclr", + "request": "launch", + "preLaunchTask": "build", + // If you have changed target frameworks, make sure to update the program path. + "program": "${workspaceFolder}/Tests/HiveMQtt.Test/bin/Debug/net8.0/HiveMQtt.Test.dll", + "args": [], + "cwd": "${workspaceFolder}/Tests/HiveMQtt.Test", + // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console + "console": "integratedTerminal", + "stopAtEntry": false + } ] } diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 299d19b1..42acad41 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -44,6 +44,20 @@ "/consoleloggerparameters:NoSummary" ], "problemMatcher": "$msCompile", + "group": { + "kind": "build", + "isDefault": true + } + }, + { + "label": "build SendMessageOnLoop", + "command": "dotnet", + "type": "process", + "args": [ + "build", + "${workspaceFolder}/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj", + ], + "problemMatcher": "$msCompile", "group": "build" }, { @@ -80,10 +94,7 @@ "${workspaceFolder}/Examples/SubscriberWithEvents/SubscriberWithEvents.csproj" ], "problemMatcher": "$msCompile", - "group": { - "kind": "build", - "isDefault": true - } + "group": "build" } ] } diff --git a/Examples/Reconnect/Reconnect.csproj b/Examples/Reconnect/Reconnect.csproj index da623a8a..6caddeab 100644 --- a/Examples/Reconnect/Reconnect.csproj +++ b/Examples/Reconnect/Reconnect.csproj @@ -7,17 +7,12 @@ enable - - - $(RestoreSources);../../Source/HiveMQtt/bin/Debug/;https://api.nuget.org/v3/index.json - - - 4 + - + diff --git a/Examples/SendMessageOnLoop/Program.cs b/Examples/SendMessageOnLoop/Program.cs index 9bda0861..aaab585e 100644 --- a/Examples/SendMessageOnLoop/Program.cs +++ b/Examples/SendMessageOnLoop/Program.cs @@ -3,6 +3,7 @@ using HiveMQtt.Client; using HiveMQtt.Client.Options; using HiveMQtt.MQTT5.Types; +using HiveMQtt.Client.Exceptions; // Connect to localhost:1883. Change for brokers elsewhere. // Run HiveMQ CE locally: docker run --name hivemq-ce -d -p 1883:1883 hivemq/hivemq-ce @@ -17,6 +18,78 @@ var client = new HiveMQClient(options); + +// This handler is called when the client is disconnected +client.AfterDisconnect += async (sender, args) => +{ + var client = (HiveMQClient)sender; + + Console.WriteLine($"AfterDisconnect Handler called with args.CleanDisconnect={args.CleanDisconnect}."); + + // We've been disconnected + if (args.CleanDisconnect) + { + Console.WriteLine("--> AfterDisconnectEventArgs indicate a clean disconnect."); + Console.WriteLine("--> A clean disconnect was requested by either the client or the broker."); + } + else + { + Console.WriteLine("--> AfterDisconnectEventArgs indicate an unexpected disconnect."); + Console.WriteLine("--> This could be due to a network outage, broker outage, or other issue."); + Console.WriteLine("--> In this case we will attempt to reconnect periodically."); + + // We could have been disconnected for any number of reasons: network outage, broker outage, etc. + // Here we loop with a backing off delay until we reconnect + + // Start with a small delay and double it on each retry up to a maximum value + var delay = 5000; + var reconnectAttempts = 0; + var maxReconnectAttempts = 15; + + while (true) + { + await Task.Delay(delay).ConfigureAwait(false); + reconnectAttempts++; + + try + { + Console.WriteLine($"--> Attempting to reconnect to broker. This is attempt #{reconnectAttempts}."); + var connectResult = await client.ConnectAsync().ConfigureAwait(false); + + if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success) + { + Console.WriteLine($"--> Failed to connect: {connectResult.ReasonString}"); + + // Double the delay with each failed retry to a maximum + delay = Math.Min(delay * 2, 60000); + Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try."); + } + else + { + Console.WriteLine("--> Reconnected successfully."); + break; + } + } + catch (HiveMQttClientException ex) + { + Console.WriteLine($"--> Failed to reconnect: {ex.Message}"); + + if (reconnectAttempts > maxReconnectAttempts) + { + Console.WriteLine("--> Maximum reconnect attempts exceeded. Exiting."); + break; + } + + // Double the delay with each failed retry to a maximum + delay = Math.Min(delay * 2, 60000); + Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try."); + } + } + } // if (args.CleanDisconnect) + + Console.WriteLine("--> Exiting AfterDisconnect handler."); +}; + // Connect to the broker var connectResult = await client.ConnectAsync().ConfigureAwait(false); if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success) @@ -25,7 +98,7 @@ } // Example Settings -var wait = 50; // The delay between each message +var wait = 1000; // The delay between each message Console.WriteLine($"Starting {wait / 1000} second loop... (press q to exit)"); // Topic to send messages to @@ -39,32 +112,31 @@ // while (true) { - message_number++; - var payload = JsonSerializer.Serialize(new + if (client.IsConnected()) { - Content = "SendMessageOnLoop", - MessageNumber = message_number, - }); - - var message = new MQTT5PublishMessage - { - Topic = topic, - Payload = Encoding.ASCII.GetBytes(payload), - QoS = QualityOfService.ExactlyOnceDelivery, - }; - - var resultPublish = await client.PublishAsync(message).ConfigureAwait(false); - Console.WriteLine($"Published message {message_number} to topic {topic}: {resultPublish.QoS2ReasonCode}"); + message_number++; + var payload = JsonSerializer.Serialize(new + { + Content = "SendMessageOnLoop", + MessageNumber = message_number, + }); - await Task.Delay(wait).ConfigureAwait(false); + var message = new MQTT5PublishMessage + { + Topic = topic, + Payload = Encoding.ASCII.GetBytes(payload), + QoS = QualityOfService.ExactlyOnceDelivery, + }; - if (Console.KeyAvailable) + var resultPublish = await client.PublishAsync(message).ConfigureAwait(false); + Console.WriteLine($"Published message {message_number} to topic {topic}: {resultPublish.QoS2ReasonCode}"); + } + else { - if (Console.ReadKey().Key == ConsoleKey.Q) - { - break; - } + Console.WriteLine("Client is not connected. Standing by..."); } + + await Task.Delay(wait).ConfigureAwait(false); } Console.WriteLine("Disconnecting gracefully..."); diff --git a/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj b/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj index 94690f8f..e53d1bd8 100644 --- a/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj +++ b/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj @@ -2,17 +2,12 @@ Exe - net6.0 + net8.0 enable enable - - $(RestoreSources);../../Source/HiveMQtt/bin/Debug/;https://api.nuget.org/v3/index.json - - - + - diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index f3ef97dc..b1228938 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -93,17 +93,17 @@ public async Task ConnectAsync() Logger.Trace($"Queuing packet for send: {connPacket}"); this.SendQueue.Add(connPacket); - // FIXME: Cancellation token and better timeout value ConnAckPacket connAck; ConnectResult connectResult; try { - connAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + connAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ConnectTimeoutInMs)).ConfigureAwait(false); } catch (TimeoutException) { this.ConnectState = ConnectState.Disconnected; - throw new HiveMQttClientException("Connect timeout. No response received in time."); + Logger.Error($"Connect timeout. No response received in {this.Options.ConnectTimeoutInMs} milliseconds."); + throw new HiveMQttClientException($"Connect timeout. No response received in {this.Options.ConnectTimeoutInMs} milliseconds."); } finally { @@ -178,40 +178,19 @@ public async Task DisconnectAsync(DisconnectOptions? options = null) this.OnDisconnectSent -= eventHandler; } - this.HandleDisconnection(); + await this.HandleDisconnectionAsync().ConfigureAwait(false); return true; } - /// - /// Close the socket and set the connect state to disconnected. - /// - private void HandleDisconnection() + /// + public async Task PublishAsync(MQTT5PublishMessage message) { - Logger.Debug("HandleDisconnection: Connection lost. Handling Disconnection."); - - this.CloseSocket(); - - // Fire the corresponding event - this.AfterDisconnectEventLauncher(false); - - this.ConnectState = ConnectState.Disconnected; - - // FIXME - if (this.SendQueue.Count > 0) + if (this.IsConnected() == false) { - Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting (or were disconnected)."); + throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync."); } - // We only clear the send queue on explicit disconnect - while (this.SendQueue.TryTake(out _)) - { - } - } - - /// - public async Task PublishAsync(MQTT5PublishMessage message) - { message.Validate(); var packetIdentifier = this.GeneratePacketIdentifier(); @@ -244,6 +223,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) else if (message.QoS == QualityOfService.ExactlyOnceDelivery) { // QoS 2: Assured Delivery + PublishResult? publishResult = null; var taskCompletionSource = new TaskCompletionSource>(); void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList); EventHandler eventHandler = TaskHandler; @@ -252,10 +232,21 @@ public async Task PublishAsync(MQTT5PublishMessage message) // Construct the MQTT Connect packet and queue to send this.SendQueue.Add(publishPacket); - // Wait on the QoS 2 handshake - var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false); + List packetList; + try + { + // Wait on the QoS 2 handshake + packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false); + } + catch (TimeoutException) + { + Logger.Error("PublishAsync: QoS 2 timeout. No response received in time."); + publishResult = new PublishResult(publishPacket.Message); + publishResult.QoS2ReasonCode = null; + publishPacket.OnPublishQoS2Complete -= eventHandler; + return publishResult; + } - PublishResult? publishResult = null; foreach (var packet in packetList) { if (packet is PubRecPacket pubRecPacket) @@ -318,6 +309,11 @@ public async Task SubscribeAsync(string topic, QualityOfService /// public async Task SubscribeAsync(SubscribeOptions options) { + if (this.IsConnected() == false) + { + throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync."); + } + // Fire the corresponding event this.BeforeSubscribeEventLauncher(options); @@ -426,6 +422,11 @@ public async Task UnsubscribeAsync(List subscri public async Task UnsubscribeAsync(UnsubscribeOptions unsubOptions) { + if (this.IsConnected() == false) + { + throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync."); + } + // Fire the corresponding event this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions); @@ -480,4 +481,35 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp this.AfterUnsubscribeEventLauncher(unsubscribeResult); return unsubscribeResult; } + + /// + /// Close the socket and set the connect state to disconnected. + /// + /// Indicates whether the disconnect was intended or not. + private async Task HandleDisconnectionAsync(bool clean = true) + { + Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}."); + + // Cancel all background tasks and close the socket + this.ConnectState = ConnectState.Disconnected; + await this.cancellationTokenSource.CancelAsync().ConfigureAwait(false); + this.CloseSocket(); + + if (clean) + { + if (this.SendQueue.Count > 0) + { + Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting."); + } + + // We only clear the send queue on explicit disconnect + while (this.SendQueue.TryTake(out _)) + { + } + } + + // Fire the corresponding after event + this.AfterDisconnectEventLauncher(clean); + return true; + } } diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 6319176d..c0aa1e66 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -247,22 +247,14 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => if (readResult.IsCompleted) { Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); - if (this.ConnectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: was unexpected"); - this.ConnectState = ConnectState.Disconnected; - - // Launch the AfterDisconnect event with a clean disconnect set to false. - this.AfterDisconnectEventLauncher(false); - - await this.cancellationTokenSource.CancelAsync().ConfigureAwait(false); - - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); - return false; + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } + Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); return true; } @@ -356,7 +348,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok case DisconnectPacket disconnectPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}"); Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - this.HandleDisconnection(); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); this.OnDisconnectReceivedEventLauncher(disconnectPacket); break; case PingRespPacket pingRespPacket: @@ -422,6 +414,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket) var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success); var publishQoS2Chain = new List { publishPacket, pubRecResponse }; + // FIXME: Wait for QoS 2 transaction to complete before calling OnMessageReceivedEventLauncher??? if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain) == false) { Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an incoming QoS {publishPacket.Message.QoS} publish ."); diff --git a/Source/HiveMQtt/Client/Options/ConnectOptions.cs b/Source/HiveMQtt/Client/Options/ConnectOptions.cs deleted file mode 100644 index d6724905..00000000 --- a/Source/HiveMQtt/Client/Options/ConnectOptions.cs +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright 2022-present HiveMQ and the HiveMQ Community - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -namespace HiveMQtt.Client.Options; - -/// -/// Options class for connect. -/// -public class ConnectOptions -{ - public ConnectOptions() - { - this.CleanStart = true; - this.KeepAlive = 60; - } - - public bool CleanStart { get; set; } - - public int KeepAlive { get; set; } - - public string? ClientId { get; set; } -} diff --git a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs index 58545804..0971f6cd 100644 --- a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs +++ b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs @@ -44,6 +44,7 @@ public HiveMQClientOptions() this.UseTLS = false; this.AllowInvalidBrokerCertificates = false; this.ClientCertificates = new X509CertificateCollection(); + this.ConnectTimeoutInMs = 5000; } // Client Identifier to be used in the Client. Will be set automatically if not specified. @@ -172,6 +173,11 @@ public HiveMQClientOptions() /// public LastWillAndTestament? LastWillAndTestament { get; set; } + /// + /// Gets or sets the time in milliseconds to wait for a connection to be established. + /// + public int ConnectTimeoutInMs { get; set; } + /// /// Generate a semi-random client identifier to be used in Client connections. /// hmqc#-pid-randomstring. diff --git a/Source/HiveMQtt/Client/Results/PublishResult.cs b/Source/HiveMQtt/Client/Results/PublishResult.cs index 083769c9..7c403ddd 100644 --- a/Source/HiveMQtt/Client/Results/PublishResult.cs +++ b/Source/HiveMQtt/Client/Results/PublishResult.cs @@ -40,23 +40,25 @@ public PublishResult(MQTT5PublishMessage message, PubAckPacket pubAckPacket) { this.Message = message; this.pubAckPacket = pubAckPacket; + this.QoS1ReasonCode = pubAckPacket.ReasonCode; } public PublishResult(MQTT5PublishMessage message, PubRecPacket pubRecPacket) { this.Message = message; this.pubRecPacket = pubRecPacket; + this.QoS2ReasonCode = pubRecPacket.ReasonCode; } /// - /// Gets the reason code of the PubAck packet for QoS 1 publishes. + /// Gets or sets the reason code of the PubAck packet for QoS 1 publishes. /// - public PubAckReasonCode? QoS1ReasonCode => this.pubAckPacket?.ReasonCode; + public PubAckReasonCode? QoS1ReasonCode { get; set; } /// - /// Gets the reason code of the PubRec packet for QoS 2 publishes. + /// Gets or sets the reason code of the PubRec packet for QoS 2 publishes. /// - public PubRecReasonCode? QoS2ReasonCode => this.pubRecPacket?.ReasonCode; + public PubRecReasonCode? QoS2ReasonCode { get; set; } /// /// Gets the message that was published.