diff --git a/.vscode/launch.json b/.vscode/launch.json
index e0a489c3..cbdd6229 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -1,27 +1,37 @@
{
"version": "0.2.0",
"configurations": [
-
- {
- // Use IntelliSense to find out which attributes exist for C# debugging
- // Use hover for the description of the existing attributes
- // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
- "name": ".NET Core Launch (console)",
- "type": "coreclr",
- "request": "launch",
- "preLaunchTask": "build",
- // If you have changed target frameworks, make sure to update the program path.
- "program": "${workspaceFolder}/Tests/HiveMQtt.Test/bin/Debug/net8.0/HiveMQtt.Test.dll",
- "args": [],
- "cwd": "${workspaceFolder}/Tests/HiveMQtt.Test",
- // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
- "console": "integratedTerminal",
- "stopAtEntry": false
- },
- {
- "name": ".NET Core Attach",
- "type": "coreclr",
- "request": "attach"
- }
+ {
+ "name": "SendMessageOnLoop .NET Core Launch (console)",
+ "type": "coreclr",
+ "request": "launch",
+ "preLaunchTask": "build SendMessageOnLoop",
+ "program": "${workspaceFolder}/Examples/SendMessageOnLoop/bin/Debug/net8.0/SendMessageOnLoop.dll",
+ "args": [],
+ "cwd": "${workspaceFolder}",
+ "stopAtEntry": false,
+ "console": "internalConsole"
+ },
+ {
+ "name": ".NET Core Attach",
+ "type": "coreclr",
+ "request": "attach"
+ },
+ {
+ // Use IntelliSense to find out which attributes exist for C# debugging
+ // Use hover for the description of the existing attributes
+ // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
+ "name": ".NET Core Launch (console)",
+ "type": "coreclr",
+ "request": "launch",
+ "preLaunchTask": "build",
+ // If you have changed target frameworks, make sure to update the program path.
+ "program": "${workspaceFolder}/Tests/HiveMQtt.Test/bin/Debug/net8.0/HiveMQtt.Test.dll",
+ "args": [],
+ "cwd": "${workspaceFolder}/Tests/HiveMQtt.Test",
+ // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
+ "console": "integratedTerminal",
+ "stopAtEntry": false
+ }
]
}
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 299d19b1..42acad41 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -44,6 +44,20 @@
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile",
+ "group": {
+ "kind": "build",
+ "isDefault": true
+ }
+ },
+ {
+ "label": "build SendMessageOnLoop",
+ "command": "dotnet",
+ "type": "process",
+ "args": [
+ "build",
+ "${workspaceFolder}/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj",
+ ],
+ "problemMatcher": "$msCompile",
"group": "build"
},
{
@@ -80,10 +94,7 @@
"${workspaceFolder}/Examples/SubscriberWithEvents/SubscriberWithEvents.csproj"
],
"problemMatcher": "$msCompile",
- "group": {
- "kind": "build",
- "isDefault": true
- }
+ "group": "build"
}
]
}
diff --git a/Examples/Reconnect/Reconnect.csproj b/Examples/Reconnect/Reconnect.csproj
index da623a8a..6caddeab 100644
--- a/Examples/Reconnect/Reconnect.csproj
+++ b/Examples/Reconnect/Reconnect.csproj
@@ -7,17 +7,12 @@
enable
-
-
- $(RestoreSources);../../Source/HiveMQtt/bin/Debug/;https://api.nuget.org/v3/index.json
-
-
-
4
+
-
+
diff --git a/Examples/SendMessageOnLoop/Program.cs b/Examples/SendMessageOnLoop/Program.cs
index 9bda0861..aaab585e 100644
--- a/Examples/SendMessageOnLoop/Program.cs
+++ b/Examples/SendMessageOnLoop/Program.cs
@@ -3,6 +3,7 @@
using HiveMQtt.Client;
using HiveMQtt.Client.Options;
using HiveMQtt.MQTT5.Types;
+using HiveMQtt.Client.Exceptions;
// Connect to localhost:1883. Change for brokers elsewhere.
// Run HiveMQ CE locally: docker run --name hivemq-ce -d -p 1883:1883 hivemq/hivemq-ce
@@ -17,6 +18,78 @@
var client = new HiveMQClient(options);
+
+// This handler is called when the client is disconnected
+client.AfterDisconnect += async (sender, args) =>
+{
+ var client = (HiveMQClient)sender;
+
+ Console.WriteLine($"AfterDisconnect Handler called with args.CleanDisconnect={args.CleanDisconnect}.");
+
+ // We've been disconnected
+ if (args.CleanDisconnect)
+ {
+ Console.WriteLine("--> AfterDisconnectEventArgs indicate a clean disconnect.");
+ Console.WriteLine("--> A clean disconnect was requested by either the client or the broker.");
+ }
+ else
+ {
+ Console.WriteLine("--> AfterDisconnectEventArgs indicate an unexpected disconnect.");
+ Console.WriteLine("--> This could be due to a network outage, broker outage, or other issue.");
+ Console.WriteLine("--> In this case we will attempt to reconnect periodically.");
+
+ // We could have been disconnected for any number of reasons: network outage, broker outage, etc.
+ // Here we loop with a backing off delay until we reconnect
+
+ // Start with a small delay and double it on each retry up to a maximum value
+ var delay = 5000;
+ var reconnectAttempts = 0;
+ var maxReconnectAttempts = 15;
+
+ while (true)
+ {
+ await Task.Delay(delay).ConfigureAwait(false);
+ reconnectAttempts++;
+
+ try
+ {
+ Console.WriteLine($"--> Attempting to reconnect to broker. This is attempt #{reconnectAttempts}.");
+ var connectResult = await client.ConnectAsync().ConfigureAwait(false);
+
+ if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success)
+ {
+ Console.WriteLine($"--> Failed to connect: {connectResult.ReasonString}");
+
+ // Double the delay with each failed retry to a maximum
+ delay = Math.Min(delay * 2, 60000);
+ Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try.");
+ }
+ else
+ {
+ Console.WriteLine("--> Reconnected successfully.");
+ break;
+ }
+ }
+ catch (HiveMQttClientException ex)
+ {
+ Console.WriteLine($"--> Failed to reconnect: {ex.Message}");
+
+ if (reconnectAttempts > maxReconnectAttempts)
+ {
+ Console.WriteLine("--> Maximum reconnect attempts exceeded. Exiting.");
+ break;
+ }
+
+ // Double the delay with each failed retry to a maximum
+ delay = Math.Min(delay * 2, 60000);
+ Console.WriteLine($"--> Will delay for {delay / 1000} seconds until next try.");
+ }
+ }
+ } // if (args.CleanDisconnect)
+
+ Console.WriteLine("--> Exiting AfterDisconnect handler.");
+};
+
// Connect to the broker
var connectResult = await client.ConnectAsync().ConfigureAwait(false);
if (connectResult.ReasonCode != HiveMQtt.MQTT5.ReasonCodes.ConnAckReasonCode.Success)
@@ -25,7 +98,7 @@
}
// Example Settings
-var wait = 50; // The delay between each message
+var wait = 1000; // The delay between each message
Console.WriteLine($"Starting {wait / 1000} second loop... (press q to exit)");
// Topic to send messages to
@@ -39,32 +112,31 @@
//
while (true)
{
- message_number++;
- var payload = JsonSerializer.Serialize(new
+ if (client.IsConnected())
{
- Content = "SendMessageOnLoop",
- MessageNumber = message_number,
- });
-
- var message = new MQTT5PublishMessage
- {
- Topic = topic,
- Payload = Encoding.ASCII.GetBytes(payload),
- QoS = QualityOfService.ExactlyOnceDelivery,
- };
-
- var resultPublish = await client.PublishAsync(message).ConfigureAwait(false);
- Console.WriteLine($"Published message {message_number} to topic {topic}: {resultPublish.QoS2ReasonCode}");
+ message_number++;
+ var payload = JsonSerializer.Serialize(new
+ {
+ Content = "SendMessageOnLoop",
+ MessageNumber = message_number,
+ });
- await Task.Delay(wait).ConfigureAwait(false);
+ var message = new MQTT5PublishMessage
+ {
+ Topic = topic,
+ Payload = Encoding.ASCII.GetBytes(payload),
+ QoS = QualityOfService.ExactlyOnceDelivery,
+ };
- if (Console.KeyAvailable)
+ var resultPublish = await client.PublishAsync(message).ConfigureAwait(false);
+ Console.WriteLine($"Published message {message_number} to topic {topic}: {resultPublish.QoS2ReasonCode}");
+ }
+ else
{
- if (Console.ReadKey().Key == ConsoleKey.Q)
- {
- break;
- }
+ Console.WriteLine("Client is not connected. Standing by...");
}
+
+ await Task.Delay(wait).ConfigureAwait(false);
}
Console.WriteLine("Disconnecting gracefully...");
diff --git a/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj b/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj
index 94690f8f..e53d1bd8 100644
--- a/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj
+++ b/Examples/SendMessageOnLoop/SendMessageOnLoop.csproj
@@ -2,17 +2,12 @@
Exe
- net6.0
+ net8.0
enable
enable
-
- $(RestoreSources);../../Source/HiveMQtt/bin/Debug/;https://api.nuget.org/v3/index.json
-
-
-
+
-
diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs
index f3ef97dc..b1228938 100644
--- a/Source/HiveMQtt/Client/HiveMQClient.cs
+++ b/Source/HiveMQtt/Client/HiveMQClient.cs
@@ -93,17 +93,17 @@ public async Task ConnectAsync()
Logger.Trace($"Queuing packet for send: {connPacket}");
this.SendQueue.Add(connPacket);
- // FIXME: Cancellation token and better timeout value
ConnAckPacket connAck;
ConnectResult connectResult;
try
{
- connAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
+ connAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromMilliseconds(this.Options.ConnectTimeoutInMs)).ConfigureAwait(false);
}
catch (TimeoutException)
{
this.ConnectState = ConnectState.Disconnected;
- throw new HiveMQttClientException("Connect timeout. No response received in time.");
+ Logger.Error($"Connect timeout. No response received in {this.Options.ConnectTimeoutInMs} milliseconds.");
+ throw new HiveMQttClientException($"Connect timeout. No response received in {this.Options.ConnectTimeoutInMs} milliseconds.");
}
finally
{
@@ -178,40 +178,19 @@ public async Task DisconnectAsync(DisconnectOptions? options = null)
this.OnDisconnectSent -= eventHandler;
}
- this.HandleDisconnection();
+ await this.HandleDisconnectionAsync().ConfigureAwait(false);
return true;
}
- ///
- /// Close the socket and set the connect state to disconnected.
- ///
- private void HandleDisconnection()
+ ///
+ public async Task PublishAsync(MQTT5PublishMessage message)
{
- Logger.Debug("HandleDisconnection: Connection lost. Handling Disconnection.");
-
- this.CloseSocket();
-
- // Fire the corresponding event
- this.AfterDisconnectEventLauncher(false);
-
- this.ConnectState = ConnectState.Disconnected;
-
- // FIXME
- if (this.SendQueue.Count > 0)
+ if (this.IsConnected() == false)
{
- Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting (or were disconnected).");
+ throw new HiveMQttClientException("PublishAsync: Client is not connected. Check client.IsConnected() before calling PublishAsync.");
}
- // We only clear the send queue on explicit disconnect
- while (this.SendQueue.TryTake(out _))
- {
- }
- }
-
- ///
- public async Task PublishAsync(MQTT5PublishMessage message)
- {
message.Validate();
var packetIdentifier = this.GeneratePacketIdentifier();
@@ -244,6 +223,7 @@ public async Task PublishAsync(MQTT5PublishMessage message)
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
{
// QoS 2: Assured Delivery
+ PublishResult? publishResult = null;
var taskCompletionSource = new TaskCompletionSource>();
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList);
EventHandler eventHandler = TaskHandler;
@@ -252,10 +232,21 @@ public async Task PublishAsync(MQTT5PublishMessage message)
// Construct the MQTT Connect packet and queue to send
this.SendQueue.Add(publishPacket);
- // Wait on the QoS 2 handshake
- var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
+ List packetList;
+ try
+ {
+ // Wait on the QoS 2 handshake
+ packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
+ }
+ catch (TimeoutException)
+ {
+ Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");
+ publishResult = new PublishResult(publishPacket.Message);
+ publishResult.QoS2ReasonCode = null;
+ publishPacket.OnPublishQoS2Complete -= eventHandler;
+ return publishResult;
+ }
- PublishResult? publishResult = null;
foreach (var packet in packetList)
{
if (packet is PubRecPacket pubRecPacket)
@@ -318,6 +309,11 @@ public async Task SubscribeAsync(string topic, QualityOfService
///
public async Task SubscribeAsync(SubscribeOptions options)
{
+ if (this.IsConnected() == false)
+ {
+ throw new HiveMQttClientException("SubscribeAsync: Client is not connected. Check client.IsConnected() before calling SubscribeAsync.");
+ }
+
// Fire the corresponding event
this.BeforeSubscribeEventLauncher(options);
@@ -426,6 +422,11 @@ public async Task UnsubscribeAsync(List subscri
public async Task UnsubscribeAsync(UnsubscribeOptions unsubOptions)
{
+ if (this.IsConnected() == false)
+ {
+ throw new HiveMQttClientException("UnsubscribeAsync: Client is not connected. Check client.IsConnected() before calling UnsubscribeAsync.");
+ }
+
// Fire the corresponding event
this.BeforeUnsubscribeEventLauncher(unsubOptions.Subscriptions);
@@ -480,4 +481,35 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp
this.AfterUnsubscribeEventLauncher(unsubscribeResult);
return unsubscribeResult;
}
+
+ ///
+ /// Close the socket and set the connect state to disconnected.
+ ///
+ /// Indicates whether the disconnect was intended or not.
+ private async Task HandleDisconnectionAsync(bool clean = true)
+ {
+ Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}.");
+
+ // Cancel all background tasks and close the socket
+ this.ConnectState = ConnectState.Disconnected;
+ await this.cancellationTokenSource.CancelAsync().ConfigureAwait(false);
+ this.CloseSocket();
+
+ if (clean)
+ {
+ if (this.SendQueue.Count > 0)
+ {
+ Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting.");
+ }
+
+ // We only clear the send queue on explicit disconnect
+ while (this.SendQueue.TryTake(out _))
+ {
+ }
+ }
+
+ // Fire the corresponding after event
+ this.AfterDisconnectEventLauncher(clean);
+ return true;
+ }
}
diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
index 6319176d..c0aa1e66 100644
--- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
@@ -247,22 +247,14 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) =>
if (readResult.IsCompleted)
{
Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream");
-
if (this.ConnectState == ConnectState.Connected)
{
// This is an unexpected exit and may be due to a network failure.
- Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: was unexpected");
- this.ConnectState = ConnectState.Disconnected;
-
- // Launch the AfterDisconnect event with a clean disconnect set to false.
- this.AfterDisconnectEventLauncher(false);
-
- await this.cancellationTokenSource.CancelAsync().ConfigureAwait(false);
-
- Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}");
- return false;
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected");
+ await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
}
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}");
return true;
}
@@ -356,7 +348,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationTok
case DisconnectPacket disconnectPacket:
Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}");
Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}");
- this.HandleDisconnection();
+ await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
this.OnDisconnectReceivedEventLauncher(disconnectPacket);
break;
case PingRespPacket pingRespPacket:
@@ -422,6 +414,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket)
var pubRecResponse = new PubRecPacket(publishPacket.PacketIdentifier, PubRecReasonCode.Success);
var publishQoS2Chain = new List { publishPacket, pubRecResponse };
+ // FIXME: Wait for QoS 2 transaction to complete before calling OnMessageReceivedEventLauncher???
if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, publishQoS2Chain) == false)
{
Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an incoming QoS {publishPacket.Message.QoS} publish .");
diff --git a/Source/HiveMQtt/Client/Options/ConnectOptions.cs b/Source/HiveMQtt/Client/Options/ConnectOptions.cs
deleted file mode 100644
index d6724905..00000000
--- a/Source/HiveMQtt/Client/Options/ConnectOptions.cs
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Copyright 2022-present HiveMQ and the HiveMQ Community
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-namespace HiveMQtt.Client.Options;
-
-///
-/// Options class for connect.
-///
-public class ConnectOptions
-{
- public ConnectOptions()
- {
- this.CleanStart = true;
- this.KeepAlive = 60;
- }
-
- public bool CleanStart { get; set; }
-
- public int KeepAlive { get; set; }
-
- public string? ClientId { get; set; }
-}
diff --git a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs
index 58545804..0971f6cd 100644
--- a/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs
+++ b/Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs
@@ -44,6 +44,7 @@ public HiveMQClientOptions()
this.UseTLS = false;
this.AllowInvalidBrokerCertificates = false;
this.ClientCertificates = new X509CertificateCollection();
+ this.ConnectTimeoutInMs = 5000;
}
// Client Identifier to be used in the Client. Will be set automatically if not specified.
@@ -172,6 +173,11 @@ public HiveMQClientOptions()
///
public LastWillAndTestament? LastWillAndTestament { get; set; }
+ ///
+ /// Gets or sets the time in milliseconds to wait for a connection to be established.
+ ///
+ public int ConnectTimeoutInMs { get; set; }
+
///
/// Generate a semi-random client identifier to be used in Client connections.
/// hmqc#-pid-randomstring.
diff --git a/Source/HiveMQtt/Client/Results/PublishResult.cs b/Source/HiveMQtt/Client/Results/PublishResult.cs
index 083769c9..7c403ddd 100644
--- a/Source/HiveMQtt/Client/Results/PublishResult.cs
+++ b/Source/HiveMQtt/Client/Results/PublishResult.cs
@@ -40,23 +40,25 @@ public PublishResult(MQTT5PublishMessage message, PubAckPacket pubAckPacket)
{
this.Message = message;
this.pubAckPacket = pubAckPacket;
+ this.QoS1ReasonCode = pubAckPacket.ReasonCode;
}
public PublishResult(MQTT5PublishMessage message, PubRecPacket pubRecPacket)
{
this.Message = message;
this.pubRecPacket = pubRecPacket;
+ this.QoS2ReasonCode = pubRecPacket.ReasonCode;
}
///
- /// Gets the reason code of the PubAck packet for QoS 1 publishes.
+ /// Gets or sets the reason code of the PubAck packet for QoS 1 publishes.
///
- public PubAckReasonCode? QoS1ReasonCode => this.pubAckPacket?.ReasonCode;
+ public PubAckReasonCode? QoS1ReasonCode { get; set; }
///
- /// Gets the reason code of the PubRec packet for QoS 2 publishes.
+ /// Gets or sets the reason code of the PubRec packet for QoS 2 publishes.
///
- public PubRecReasonCode? QoS2ReasonCode => this.pubRecPacket?.ReasonCode;
+ public PubRecReasonCode? QoS2ReasonCode { get; set; }
///
/// Gets the message that was published.