diff --git a/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs b/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs
index a55af0f6..92b5c8e5 100644
--- a/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs
+++ b/Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs
@@ -12,13 +12,19 @@ namespace ClientBenchmarkApp;
using HiveMQtt.MQTT5.ReasonCodes;
using HiveMQtt.MQTT5.Types;
-[SimpleJob(RunStrategy.Monitoring, iterationCount: 10, id: "MonitoringJob")]
+[SimpleJob(RunStrategy.Monitoring, iterationCount: 100, id: "MonitoringJob")]
public class ClientBenchmarks : IDisposable
{
private readonly string smallPayload = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}");
private HiveMQClient client;
+ public ClientBenchmarks()
+ {
+ Console.WriteLine("Starting HiveMQ client benchmarks...");
+ this.client = null!;
+ }
+
[GlobalSetup]
public async Task SetupAsync()
{
@@ -29,6 +35,7 @@ public async Task SetupAsync()
};
this.client = new HiveMQClient(options);
+
Console.WriteLine($"Connecting to {options.Host} on port {options.Port}...");
await this.client.ConnectAsync().ConfigureAwait(false);
@@ -50,22 +57,24 @@ public async Task CleanUpAsync()
}
[Benchmark(Description = "Publish a QoS 0 messages to the broker.")]
- public async Task PublishQoS0MessageAsync()
- {
- await this.client.PublishAsync("benchmarks/PublishQoS0Messages", this.smallPayload).ConfigureAwait(false);
- }
+ public async Task PublishQoS0MessageAsync() =>
+ await this.client.PublishAsync(
+ "benchmarks/PublishQoS0Messages",
+ this.smallPayload).ConfigureAwait(false);
[Benchmark(Description = "Publish a QoS 1 messages to the broker.")]
- public async Task PublishQoS1MessageAsync()
- {
- await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
- }
+ public async Task PublishQoS1MessageAsync() =>
+ await this.client.PublishAsync(
+ "benchmarks/PublishQoS1Messages",
+ this.smallPayload,
+ QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
[Benchmark(Description = "Publish a QoS 2 messages to the broker.")]
- public async Task PublishQoS2MessageAsync()
- {
- await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
- }
+ public async Task PublishQoS2MessageAsync() =>
+ await this.client.PublishAsync(
+ "benchmarks/PublishQoS2Messages",
+ this.smallPayload,
+ QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
public void Dispose() => GC.SuppressFinalize(this);
}
diff --git a/Documentation/docs/how-to/configure-logging.md b/Documentation/docs/how-to/configure-logging.md
index 18796510..2f207bcb 100644
--- a/Documentation/docs/how-to/configure-logging.md
+++ b/Documentation/docs/how-to/configure-logging.md
@@ -23,14 +23,29 @@ The HiveMQtt package uses [NLog](https://github.com/NLog/NLog) and can be config
Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package down to packet and event handling. Using this level will produce a lot of output such as the following:
```log
-2023-10-04 16:56:54.9373|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher
-2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|7: TrafficInflowProcessor Starting...Connecting
-2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|9: TrafficOutflowProcessor Starting...Connecting
-2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|--> ConnectPacket
-2023-10-04 16:56:55.0128|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher
-2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|<-- ConnAck
-2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher
-2023-10-04 16:56:55.0379|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher
+2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend:
+2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter
+2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader
+2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler
+2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883
+2024-03-14 15:40:18.2343|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher
+2024-03-14 15:40:18.2460|TRACE|HiveMQtt.Client.HiveMQClient|Socket connected to 127.0.0.1:1883
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|Queuing packet for send: HiveMQtt.MQTT5.Packets.ConnectPacket
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- Starting...Connecting
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- ConnectionReader Starting...Connecting
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|5: ConnectionMonitor Starting...Connecting
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- 0 received packets currently waiting to be processed.
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter Starting...Connecting
+2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 1 packets waiting to be sent.
+2024-03-14 15:40:18.2476|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- --> Sending ConnectPacket id=0
+2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher
+2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 0 packets waiting to be sent.
+2024-03-14 15:40:18.2732|TRACE|HiveMQtt.Client.HiveMQClient|ReadAsync: Read Buffer Length 11
+2024-03-14 15:40:18.2765|TRACE|HiveMQtt.MQTT5.PacketDecoder|PacketDecoder: Decoded Packet: consumed=11, packet=HiveMQtt.MQTT5.Packets.ConnAckPacket id=0
+2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- <-- Received ConnAckPacket. Adding to receivedQueue.
+2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- <-- Received ConnAck id=0
+2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher
+2024-03-14 15:40:18.2775|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher
```
## See Also
diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs
index da6d18bb..e875bce2 100644
--- a/Source/HiveMQtt/Client/HiveMQClient.cs
+++ b/Source/HiveMQtt/Client/HiveMQClient.cs
@@ -44,8 +44,16 @@ public HiveMQClient(HiveMQClientOptions? options = null)
options ??= new HiveMQClientOptions();
options.Validate();
+ Logger.Trace($"New client created: Client ID: {options.ClientId}");
+
+ Logger.Trace("Trace Level Logging Legend:");
+ Logger.Trace(" -(W)- == ConnectionWriter");
+ Logger.Trace(" -(R)- == ConnectionReader");
+ Logger.Trace(" -(CM)- == ConnectionMonitor");
+ Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
+
this.Options = options;
- this.cancellationSource = new CancellationTokenSource();
+ this.cancellationTokenSource = new CancellationTokenSource();
}
///
@@ -80,7 +88,8 @@ public async Task ConnectAsync()
// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
- this.sendQueue.Enqueue(connPacket);
+ Logger.Trace($"Queuing packet for send: {connPacket}");
+ this.sendQueue.Add(connPacket);
// FIXME: Cancellation token and better timeout value
ConnAckPacket connAck;
@@ -126,6 +135,7 @@ public async Task DisconnectAsync(DisconnectOptions? options = null)
{
if (this.connectState != ConnectState.Connected)
{
+ Logger.Warn("DisconnectAsync: Client is not connected.");
return false;
}
@@ -149,7 +159,8 @@ public async Task DisconnectAsync(DisconnectOptions? options = null)
EventHandler eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;
- this.sendQueue.Enqueue(disconnectPacket);
+ Logger.Trace($"Queuing packet for send: {disconnectPacket}");
+ this.sendQueue.Add(disconnectPacket);
try
{
@@ -173,8 +184,17 @@ public async Task DisconnectAsync(DisconnectOptions? options = null)
this.connectState = ConnectState.Disconnected;
- // Clear the send queue
- this.sendQueue.Clear();
+ // FIXME
+ if (this.sendQueue.Count > 0)
+ {
+ Logger.Warn("Disconnect: Send queue not empty. Packets pending but we are disconnecting.");
+ }
+
+ // We only clear the send queue on explicit disconnect
+ while (this.sendQueue.TryTake(out _))
+ {
+ }
+
return true;
}
@@ -189,7 +209,8 @@ public async Task PublishAsync(MQTT5PublishMessage message)
// QoS 0: Fast Service
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
- this.sendQueue.Enqueue(publishPacket);
+ Logger.Trace($"Queuing packet for send: {publishPacket}");
+ this.sendQueue.Add(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
@@ -201,7 +222,8 @@ public async Task PublishAsync(MQTT5PublishMessage message)
publishPacket.OnPublishQoS1Complete += eventHandler;
// Construct the MQTT Connect packet and queue to send
- this.sendQueue.Enqueue(publishPacket);
+ Logger.Trace($"Queuing packet for send: {publishPacket}");
+ this.sendQueue.Add(publishPacket);
var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
@@ -217,7 +239,7 @@ public async Task PublishAsync(MQTT5PublishMessage message)
publishPacket.OnPublishQoS2Complete += eventHandler;
// Construct the MQTT Connect packet and queue to send
- this.sendQueue.Enqueue(publishPacket);
+ this.sendQueue.Add(publishPacket);
// Wait on the QoS 2 handshake
var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
@@ -300,7 +322,7 @@ public async Task SubscribeAsync(SubscribeOptions options)
this.OnSubAckReceived += eventHandler;
// Queue the constructed packet to be sent on the wire
- this.sendQueue.Enqueue(subscribePacket);
+ this.sendQueue.Add(subscribePacket);
SubAckPacket subAck;
SubscribeResult subscribeResult;
@@ -404,7 +426,7 @@ public async Task UnsubscribeAsync(UnsubscribeOptions unsubOp
EventHandler eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;
- this.sendQueue.Enqueue(unsubscribePacket);
+ this.sendQueue.Add(unsubscribePacket);
// FIXME: Cancellation token and better timeout value
UnsubAckPacket unsubAck;
diff --git a/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs b/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs
index c9a83838..597fee84 100644
--- a/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs
@@ -192,7 +192,7 @@ public HiveMQClientOptionsBuilder WithClientCertificate(string clientCertificate
}
else
{
- Logger.Error("File does not exist.");
+ Logger.Error("WithClientCertificate: The specified client certificate file does not exist.");
throw new FileNotFoundException();
}
}
diff --git a/Source/HiveMQtt/Client/HiveMQClientSocket.cs b/Source/HiveMQtt/Client/HiveMQClientSocket.cs
index c79645ff..f25be2cf 100644
--- a/Source/HiveMQtt/Client/HiveMQClientSocket.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientSocket.cs
@@ -34,15 +34,13 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
private Stream? stream;
private PipeReader? reader;
private PipeWriter? writer;
- private CancellationTokenSource cancellationSource;
- private CancellationToken outFlowCancellationToken;
- private CancellationToken inFlowCancellationToken;
- private CancellationToken receivedPacketsCancellationToken;
+ private CancellationTokenSource cancellationTokenSource;
#pragma warning disable IDE0052
- private Task? trafficOutflowProcessorTask;
- private Task? trafficInflowProcessorTask;
- private Task? receivedPacketsProcessorAsync;
+ private Task? connectionWriterTask;
+ private Task? connectionReaderTask;
+ private Task? receivedPacketsHandlerAsync;
+ private Task? connectionMonitorTask;
#pragma warning restore IDE0052
///
@@ -173,18 +171,14 @@ internal async Task ConnectSocketAsync()
this.writer = PipeWriter.Create(this.stream);
// Reset the CancellationTokenSource in case this is a reconnect
- this.cancellationSource.Dispose();
- this.cancellationSource = new CancellationTokenSource();
-
- // Setup the cancellation tokens
- this.outFlowCancellationToken = this.cancellationSource.Token;
- this.inFlowCancellationToken = this.cancellationSource.Token;
- this.receivedPacketsCancellationToken = this.cancellationSource.Token;
+ this.cancellationTokenSource.Dispose();
+ this.cancellationTokenSource = new CancellationTokenSource();
// Start the traffic processors
- this.trafficOutflowProcessorTask = this.TrafficOutflowProcessorAsync(this.outFlowCancellationToken);
- this.trafficInflowProcessorTask = this.TrafficInflowProcessorAsync(this.inFlowCancellationToken);
- this.receivedPacketsProcessorAsync = this.ReceivedPacketsProcessorAsync(this.receivedPacketsCancellationToken);
+ this.connectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
+ this.connectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
+ this.receivedPacketsHandlerAsync = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
+ this.connectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token);
Logger.Trace($"Socket connected to {this.socket.RemoteEndPoint}");
return socketConnected;
@@ -254,6 +248,9 @@ private async Task CreateTLSConnectionAsync(Stream stream)
internal bool CloseSocket(bool? shutdownPipeline = true)
{
+ // Cancel the background traffic processing tasks
+ this.cancellationTokenSource.Cancel();
+
if (shutdownPipeline == true)
{
// Shutdown the pipeline
@@ -265,8 +262,6 @@ internal bool CloseSocket(bool? shutdownPipeline = true)
this.socket?.Shutdown(SocketShutdown.Both);
this.socket?.Close();
- this.cancellationSource.Cancel();
-
return true;
}
}
diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
index 233ec5dd..44fb677a 100644
--- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
@@ -30,202 +30,237 @@ namespace HiveMQtt.Client;
///
public partial class HiveMQClient : IDisposable, IHiveMQClient
{
- // The outgoing packet queue. Packets queued to be sent.
- private readonly ConcurrentQueue sendQueue = new();
+ private readonly BlockingCollection sendQueue = new();
- private readonly ConcurrentQueue receivedQueue = new();
+ private readonly BlockingCollection receivedQueue = new();
// Transactional packets indexed by packet identifier
private readonly ConcurrentDictionary> transactionQueue = new();
+ private readonly Stopwatch lastCommunicationTimer = new Stopwatch();
+
///
- /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue.
+ /// Asynchronous background task that monitors the connection state and sends PingReq packets when
+ /// necessary.
///
- private Task TrafficOutflowProcessorAsync(CancellationToken cancellationToken) => Task.Run(
+ /// The cancellation token.
+ /// A boolean return indicating exit state.
+ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
- var stopWatch = new Stopwatch();
var keepAlivePeriod = this.Options.KeepAlive / 2;
+ Logger.Trace($"{this.Options.ClientId}-(CM)- Starting...{this.connectState}");
- stopWatch.Start();
-
- Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Starting...{this.connectState}");
-
- while (this.connectState != ConnectState.Disconnected)
+ while (true)
{
- if (stopWatch.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
+ if (cancellationToken.IsCancellationRequested)
{
- // Send PingReq
- Logger.Trace("--> PingReq");
- var writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
- this.OnPingReqSentEventLauncher(new PingReqPacket());
- stopWatch.Restart();
+ Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled");
+ break;
}
- if (this.sendQueue.IsEmpty)
+ // If connected and no recent traffic, send a ping
+ if (this.connectState == ConnectState.Connected)
{
- if (this.connectState == ConnectState.Disconnecting)
- {
- return true;
- }
- else
+ if (this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
{
- await Task.Delay(50).ConfigureAwait(false);
- continue;
+ // Send PingReq
+ Logger.Trace($"{this.Options.ClientId}-(CM)- --> PingReq");
+ this.sendQueue.Add(new PingReqPacket());
}
}
- Logger.Trace($"TrafficOutflowProcessor: {this.sendQueue.Count} packets waiting to be sent.");
+ try
+ {
+ await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
+ }
+ catch (TaskCanceledException)
+ {
+ Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled");
+ break;
+ }
+ }
+
+ Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.connectState}");
+
+ return true;
+ }, cancellationToken);
- // Batch load up to 50 queued packets
- List packetsToSend = new();
- while (this.sendQueue.TryDequeue(out var p))
+ ///
+ /// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue.
+ ///
+ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run(
+ async () =>
+ {
+ this.lastCommunicationTimer.Start();
+ Logger.Trace($"{this.Options.ClientId}-(W)- Starting...{this.connectState}");
+
+ while (true)
+ {
+ if (cancellationToken.IsCancellationRequested)
{
- packetsToSend.Add(p);
- if (packetsToSend.Count >= 50)
- {
- break;
- }
+ Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled with {this.sendQueue.Count} packets remaining.");
+ break;
}
- Logger.Trace($"TrafficOutflowProcessor: Sending a batch of {packetsToSend.Count} packets.");
- foreach (var packet in packetsToSend)
+ while (this.connectState == ConnectState.Disconnected)
{
- FlushResult writeResult = default;
+ Logger.Trace($"{this.Options.ClientId}-(W)- Not connected. Waiting for connect...");
+ await Task.Delay(2000).ConfigureAwait(false);
+ continue;
+ }
- switch (packet)
- {
- // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time.
- case ConnectPacket connectPacket:
- Logger.Trace($"--> ConnectPacket id={connectPacket.PacketIdentifier}");
- writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
- this.OnConnectSentEventLauncher(connectPacket);
- break;
- case DisconnectPacket disconnectPacket:
- Logger.Trace($"--> DisconnectPacket id={disconnectPacket.PacketIdentifier}");
- writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false);
- this.OnDisconnectSentEventLauncher(disconnectPacket);
- break;
- case SubscribePacket subscribePacket:
- Logger.Trace($"--> SubscribePacket id={subscribePacket.PacketIdentifier}");
- writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
- this.OnSubscribeSentEventLauncher(subscribePacket);
- break;
- case UnsubscribePacket unsubscribePacket:
- Logger.Trace($"--> UnsubscribePacket id={unsubscribePacket.PacketIdentifier}");
- writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
- this.OnUnsubscribeSentEventLauncher(unsubscribePacket);
- break;
- case PublishPacket publishPacket:
- Logger.Trace($"--> PublishPacket id={publishPacket.PacketIdentifier}");
- if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery ||
- publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery)
+ Logger.Trace($"{this.Options.ClientId}-(W)- {this.sendQueue.Count} packets waiting to be sent.");
+
+ var packet = this.sendQueue.Take();
+ FlushResult writeResult = default;
+
+ switch (packet)
+ {
+ // FIXME: Only one connect, subscribe or unsubscribe packet can be sent at a time.
+ case ConnectPacket connectPacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending ConnectPacket id={connectPacket.PacketIdentifier}");
+ writeResult = await this.WriteAsync(connectPacket.Encode()).ConfigureAwait(false);
+ this.OnConnectSentEventLauncher(connectPacket);
+ break;
+ case DisconnectPacket disconnectPacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending DisconnectPacket id={disconnectPacket.PacketIdentifier}");
+ writeResult = await this.WriteAsync(disconnectPacket.Encode()).ConfigureAwait(false);
+ this.OnDisconnectSentEventLauncher(disconnectPacket);
+ break;
+ case SubscribePacket subscribePacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending SubscribePacket id={subscribePacket.PacketIdentifier}");
+ writeResult = await this.WriteAsync(subscribePacket.Encode()).ConfigureAwait(false);
+ this.OnSubscribeSentEventLauncher(subscribePacket);
+ break;
+ case UnsubscribePacket unsubscribePacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending UnsubscribePacket id={unsubscribePacket.PacketIdentifier}");
+ writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false);
+ this.OnUnsubscribeSentEventLauncher(unsubscribePacket);
+ break;
+ case PublishPacket publishPacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PublishPacket id={publishPacket.PacketIdentifier}");
+ if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery ||
+ publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery)
+ {
+ // QoS > 0 - Add to transaction queue
+ if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false)
{
- // QoS > 0 - Add to transaction queue
- if (this.transactionQueue.TryAdd(publishPacket.PacketIdentifier, new List { publishPacket }) == false)
- {
- Logger.Warn("Duplicate packet ID detected.");
- continue;
- }
+ Logger.Warn("Duplicate packet ID detected.");
+ continue;
}
+ }
- writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);
- this.OnPublishSentEventLauncher(publishPacket);
- break;
- case PubAckPacket pubAckPacket:
- // This is in response to a received Publish packet. Communication chain management
- // was done in the receiver code. Just send the response.
- Logger.Trace($"--> PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}");
- writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
- this.OnPubAckSentEventLauncher(pubAckPacket);
- break;
- case PubRecPacket pubRecPacket:
- // This is in response to a received Publish packet. Communication chain management
- // was done in the receiver code. Just send the response.
- Logger.Trace($"--> PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}");
- writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
- this.OnPubRecSentEventLauncher(pubRecPacket);
- break;
- case PubRelPacket pubRelPacket:
- // This is in response to a received PubRec packet. Communication chain management
- // was done in the receiver code. Just send the response.
- Logger.Trace($"--> PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}");
- writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
- this.OnPubRelSentEventLauncher(pubRelPacket);
- break;
- case PubCompPacket pubCompPacket:
- // This is in response to a received PubRel packet. Communication chain management
- // was done in the receiver code. Just send the response.
- Logger.Trace($"--> PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}");
- writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
- this.OnPubCompSentEventLauncher(pubCompPacket);
- break;
- /* case AuthPacket authPacket:
- /* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false);
- /* this.OnAuthSentEventLauncher(authPacket);
- /* break;
- */
- default:
- Logger.Trace($"--> Unknown packet type {packet}");
- break;
-
- } // switch
-
- if (writeResult.IsCanceled)
- {
- Logger.Trace("TrafficOutflowProcessor Write Canceled");
+ writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false);
+ this.OnPublishSentEventLauncher(publishPacket);
+ break;
+ case PubAckPacket pubAckPacket:
+ // This is in response to a received Publish packet. Communication chain management
+ // was done in the receiver code. Just send the response.
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}");
+ writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false);
+ this.OnPubAckSentEventLauncher(pubAckPacket);
+ break;
+ case PubRecPacket pubRecPacket:
+ // This is in response to a received Publish packet. Communication chain management
+ // was done in the receiver code. Just send the response.
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}");
+ writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false);
+ this.OnPubRecSentEventLauncher(pubRecPacket);
+ break;
+ case PubRelPacket pubRelPacket:
+ // This is in response to a received PubRec packet. Communication chain management
+ // was done in the receiver code. Just send the response.
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRelPacket id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}");
+ writeResult = await this.WriteAsync(pubRelPacket.Encode()).ConfigureAwait(false);
+ this.OnPubRelSentEventLauncher(pubRelPacket);
+ break;
+ case PubCompPacket pubCompPacket:
+ // This is in response to a received PubRel packet. Communication chain management
+ // was done in the receiver code. Just send the response.
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubCompPacket id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}");
+ writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false);
+ this.OnPubCompSentEventLauncher(pubCompPacket);
+ break;
+ case PingReqPacket pingReqPacket:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}");
+ writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false);
+ this.OnPingReqSentEventLauncher(pingReqPacket);
break;
- }
- if (writeResult.IsCompleted)
- {
- Logger.Trace("TrafficOutflowProcessor IsCompleted: end of the stream");
+ /* case AuthPacket authPacket:
+ /* writeResult = await this.writer.WriteAsync(authPacket.Encode()).ConfigureAwait(false);
+ /* this.OnAuthSentEventLauncher(authPacket);
+ /* break;
+ */
+ default:
+ Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}");
break;
- }
- stopWatch.Restart();
- } // foreach
- } // while
+ } // switch
+
+ // if (writeResult.IsCancelled)
+ // {
+ // Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled");
+ // break;
+ // }
- Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficOutflowProcessor Exiting...{this.connectState}");
+ if (writeResult.IsCompleted)
+ {
+ Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream");
+ break;
+ }
+
+ this.lastCommunicationTimer.Restart();
+ } // foreach
+
+ Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.connectState}");
return true;
}, cancellationToken);
///
/// Asynchronous background task that handles the incoming traffic of packets. Received packets
- /// are queued into this.receivedQueue for processing by ReceivedPacketsProcessorAsync.
+ /// are queued into this.receivedQueue for processing by ReceivedPacketsHandlerAsync.
///
- private Task TrafficInflowProcessorAsync(CancellationToken cancellationToken) => Task.Run(
+ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
- Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Starting...{this.connectState}");
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Starting...{this.connectState}");
ReadResult readResult;
while (this.connectState is ConnectState.Connecting or ConnectState.Connected)
{
- readResult = await this.ReadAsync().ConfigureAwait(false);
-
- if (cancellationToken.IsCancellationRequested || readResult.IsCanceled)
+ if (cancellationToken.IsCancellationRequested)
{
- Logger.Trace("TrafficInflowProcessor exiting due to cancellation: {cancellationToken.IsCancellationRequested} {readResult.IsCanceled}");
+ Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled");
break;
}
+ readResult = await this.ReadAsync().ConfigureAwait(false);
+
+ // if (readResult.IsCancelled)
+ // {
+ // Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result.");
+ // break;
+ // }
+
if (readResult.IsCompleted)
{
- Logger.Trace("TrafficInflowProcessor IsCompleted: end of the stream");
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream");
if (this.connectState == ConnectState.Connected)
{
// This is an unexpected exit and may be due to a network failure.
- Logger.Trace("TrafficInflowProcessor IsCompleted: was unexpected");
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: was unexpected");
this.connectState = ConnectState.Disconnected;
// Launch the AfterDisconnect event with a clean disconnect set to false.
this.AfterDisconnectEventLauncher(false);
- this.cancellationSource.Cancel();
+ this.cancellationTokenSource.Cancel();
+
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}");
return false;
}
@@ -241,7 +276,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok
if (decodedPacket is MalformedPacket)
{
Logger.Warn($"Malformed packet received. Disconnecting.");
- Logger.Debug($"Malformed packet received: {decodedPacket}");
+ Logger.Debug($"{this.Options.ClientId}-(R)- Malformed packet received: {decodedPacket}");
var opts = new DisconnectOptions
{
@@ -255,7 +290,7 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok
// Advance the reader to the end of the consumed data
buffer = buffer.Slice(0, consumed);
this.reader?.AdvanceTo(buffer.Start, readResult.Buffer.End);
- Logger.Trace("TrafficInflowProcessor: PacketDecoder.TryDecode returned false. Waiting for more data...");
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader: PacketDecoder.TryDecode returned false. Waiting for more data...");
break;
}
@@ -264,14 +299,14 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok
this.reader?.AdvanceTo(buffer.Start);
// Add the packet to the received queue for processing later
- // by ReceivedPacketsProcessorAsync
- this.receivedQueue.Enqueue(decodedPacket);
+ // by ReceivedPacketsHandlerAsync
+ Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name}. Adding to receivedQueue.");
+ this.receivedQueue.Add(decodedPacket);
} // while (buffer.Length > 0
} // while (this.connectState is ConnectState.Connecting or ConnectState.Connected)
- Logger.Trace($"{Environment.CurrentManagedThreadId}: TrafficInflowProcessor Exiting...{this.connectState}");
-
+ Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.connectState}");
return true;
}, cancellationToken);
@@ -280,79 +315,68 @@ private Task TrafficInflowProcessorAsync(CancellationToken cancellationTok
///
/// The cancellation token to stop the task.
/// A fairly worthless boolean.
- private Task ReceivedPacketsProcessorAsync(CancellationToken cancellationToken) => Task.Run(
- async () =>
+ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Task.Run(
+ () =>
{
- Logger.Trace($"{Environment.CurrentManagedThreadId}: ReceivedPacketsProcessor Starting...{this.connectState}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- Starting...{this.connectState}");
while (true)
{
- if (this.receivedQueue.IsEmpty)
- {
- await Task.Delay(50).ConfigureAwait(false);
- continue;
- }
- else
- {
- Logger.Trace($"ReceivedPacketsProcessor: {this.receivedQueue.Count} received packets waiting to be processed.");
- }
-
- if (this.receivedQueue.TryDequeue(out var packet))
- {
- switch (packet)
- {
- case ConnAckPacket connAckPacket:
- Logger.Trace($"<-- ConnAck id={connAckPacket.PacketIdentifier}");
- this.OnConnAckReceivedEventLauncher(connAckPacket);
- break;
- case DisconnectPacket disconnectPacket:
- Logger.Trace($"<-- Disconnect id={disconnectPacket.PacketIdentifier}");
- Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}");
- this.OnDisconnectReceivedEventLauncher(disconnectPacket);
- break;
- case PingRespPacket pingRespPacket:
- Logger.Trace($"<-- PingResp id={pingRespPacket.PacketIdentifier}");
- this.OnPingRespReceivedEventLauncher(pingRespPacket);
- break;
- case SubAckPacket subAckPacket:
- Logger.Trace($"<-- SubAck id={subAckPacket.PacketIdentifier}");
- this.OnSubAckReceivedEventLauncher(subAckPacket);
- break;
- case UnsubAckPacket unsubAckPacket:
- Logger.Trace($"<-- UnsubAck id={unsubAckPacket.PacketIdentifier}");
- this.OnUnsubAckReceivedEventLauncher(unsubAckPacket);
- break;
- case PublishPacket publishPacket:
- this.HandleIncomingPublishPacket(publishPacket);
- break;
- case PubAckPacket pubAckPacket:
- this.HandleIncomingPubAckPacket(pubAckPacket);
- break;
- case PubRecPacket pubRecPacket:
- this.HandleIncomingPubRecPacket(pubRecPacket);
- break;
- case PubRelPacket pubRelPacket:
- this.HandleIncomingPubRelPacket(pubRelPacket);
- break;
- case PubCompPacket pubCompPacket:
- this.HandleIncomingPubCompPacket(pubCompPacket);
- break;
- default:
- Logger.Trace("<-- Unknown");
- Logger.Error($"Unrecognized packet received. Will discard. {packet}");
- break;
- } // switch (packet)
- }
-
if (cancellationToken.IsCancellationRequested)
{
- Logger.Trace("ReceivedPacketsProcessor Canceled");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.receivedQueue.Count} received packets remaining.");
break;
}
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- {this.receivedQueue.Count} received packets currently waiting to be processed.");
+
+ var packet = this.receivedQueue.Take();
+ switch (packet)
+ {
+ case ConnAckPacket connAckPacket:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}");
+ this.OnConnAckReceivedEventLauncher(connAckPacket);
+ break;
+ case DisconnectPacket disconnectPacket:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Disconnect id={disconnectPacket.PacketIdentifier}");
+ Logger.Warn($"Disconnect received: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}");
+ this.OnDisconnectReceivedEventLauncher(disconnectPacket);
+ break;
+ case PingRespPacket pingRespPacket:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}");
+ this.OnPingRespReceivedEventLauncher(pingRespPacket);
+ break;
+ case SubAckPacket subAckPacket:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}");
+ this.OnSubAckReceivedEventLauncher(subAckPacket);
+ break;
+ case UnsubAckPacket unsubAckPacket:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}");
+ this.OnUnsubAckReceivedEventLauncher(unsubAckPacket);
+ break;
+ case PublishPacket publishPacket:
+ this.HandleIncomingPublishPacket(publishPacket);
+ break;
+ case PubAckPacket pubAckPacket:
+ this.HandleIncomingPubAckPacket(pubAckPacket);
+ break;
+ case PubRecPacket pubRecPacket:
+ this.HandleIncomingPubRecPacket(pubRecPacket);
+ break;
+ case PubRelPacket pubRelPacket:
+ this.HandleIncomingPubRelPacket(pubRelPacket);
+ break;
+ case PubCompPacket pubCompPacket:
+ this.HandleIncomingPubCompPacket(pubCompPacket);
+ break;
+ default:
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Unknown packet type. Will discard.");
+ Logger.Error($"Unrecognized packet received. Will discard. {packet}");
+ break;
+ } // switch (packet)
} // while
- Logger.Trace($"{Environment.CurrentManagedThreadId}: ReceivedPacketsProcessor Exiting...{this.connectState}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.connectState}");
return true;
}, cancellationToken);
@@ -363,14 +387,14 @@ private Task ReceivedPacketsProcessorAsync(CancellationToken cancellationT
/// The received publish packet.
internal void HandleIncomingPublishPacket(PublishPacket publishPacket)
{
- Logger.Trace($"<-- Publish id={publishPacket.PacketIdentifier}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received Publish id={publishPacket.PacketIdentifier}");
this.OnMessageReceivedEventLauncher(publishPacket);
if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.AtLeastOnceDelivery)
{
// We've received a QoS 1 publish. Send a PubAck.
var pubAckResponse = new PubAckPacket(publishPacket.PacketIdentifier, PubAckReasonCode.Success);
- this.sendQueue.Enqueue(pubAckResponse);
+ this.sendQueue.Add(pubAckResponse);
}
else if (publishPacket.Message.QoS is MQTT5.Types.QualityOfService.ExactlyOnceDelivery)
{
@@ -384,7 +408,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket)
pubRecResponse.ReasonCode = PubRecReasonCode.PacketIdentifierInUse;
}
- this.sendQueue.Enqueue(pubRecResponse);
+ this.sendQueue.Add(pubRecResponse);
}
}
@@ -395,7 +419,7 @@ internal void HandleIncomingPublishPacket(PublishPacket publishPacket)
/// Raised if the packet identifier is unknown.
internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket)
{
- Logger.Trace($"<-- PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubAck id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}");
this.OnPubAckReceivedEventLauncher(pubAckPacket);
// Remove the transaction chain from the transaction queue
@@ -419,7 +443,7 @@ internal void HandleIncomingPubAckPacket(PubAckPacket pubAckPacket)
/// The received PubRec packet.
internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket)
{
- Logger.Trace($"<-- PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRec id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}");
this.OnPubRecReceivedEventLauncher(pubRecPacket);
// Find the QoS2 transaction chain for this packet identifier
@@ -443,13 +467,13 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket)
}
// Send the PUBREL response
- this.sendQueue.Enqueue(pubRelResponsePacket);
+ this.sendQueue.Add(pubRelResponsePacket);
}
else
{
// Send a PUBREL with PacketIdentifierNotFound
var pubRelResponsePacket = new PubRelPacket(pubRecPacket.PacketIdentifier, PubRelReasonCode.PacketIdentifierNotFound);
- this.sendQueue.Enqueue(pubRelResponsePacket);
+ this.sendQueue.Add(pubRelResponsePacket);
}
}
@@ -460,7 +484,7 @@ internal void HandleIncomingPubRecPacket(PubRecPacket pubRecPacket)
/// The received PubRel packet.
internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket)
{
- Logger.Trace($"<-- PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubRel id={pubRelPacket.PacketIdentifier} reason={pubRelPacket.ReasonCode}");
this.OnPubRelReceivedEventLauncher(pubRelPacket);
if (this.transactionQueue.TryGetValue(pubRelPacket.PacketIdentifier, out var originalPublishQoS2Chain))
@@ -485,7 +509,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket)
Logger.Warn($"QoS2: Couldn't remove PubRel --> PubComp QoS2 Chain for packet identifier {pubRelPacket.PacketIdentifier}.");
}
- this.sendQueue.Enqueue(pubCompResponsePacket);
+ this.sendQueue.Add(pubCompResponsePacket);
}
else
{
@@ -494,7 +518,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket)
// Send a PUBCOMP with PacketIdentifierNotFound
var pubCompResponsePacket = new PubCompPacket(pubRelPacket.PacketIdentifier, PubCompReasonCode.PacketIdentifierNotFound);
- this.sendQueue.Enqueue(pubCompResponsePacket);
+ this.sendQueue.Add(pubCompResponsePacket);
}
}
@@ -505,7 +529,7 @@ internal void HandleIncomingPubRelPacket(PubRelPacket pubRelPacket)
/// Raised if the packet identifier is unknown.
internal void HandleIncomingPubCompPacket(PubCompPacket pubCompPacket)
{
- Logger.Trace($"<-- PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}");
+ Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PubComp id={pubCompPacket.PacketIdentifier} reason={pubCompPacket.ReasonCode}");
this.OnPubCompReceivedEventLauncher(pubCompPacket);
// Remove the QoS 2 transaction chain from the queue
diff --git a/Source/HiveMQtt/Client/HiveMQClientUtil.cs b/Source/HiveMQtt/Client/HiveMQClientUtil.cs
index c69646c6..de6ac950 100644
--- a/Source/HiveMQtt/Client/HiveMQClientUtil.cs
+++ b/Source/HiveMQtt/Client/HiveMQClientUtil.cs
@@ -131,6 +131,20 @@ public static bool MatchTopic(string pattern, string candidate)
return Regex.IsMatch(candidate, regexp);
}
+ ///
+ /// Generate a packet identifier.
+ ///
+ /// A valid packet identifier.
+ protected int GeneratePacketIdentifier()
+ {
+ if (this.lastPacketId == ushort.MaxValue)
+ {
+ this.lastPacketId = 0;
+ }
+
+ return Interlocked.Increment(ref this.lastPacketId);
+ }
+
///
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0.
///
@@ -147,20 +161,6 @@ from executing a second time.
GC.SuppressFinalize(this);
}
- ///
- /// Generate a packet identifier.
- ///
- /// A valid packet identifier.
- protected int GeneratePacketIdentifier()
- {
- if (this.lastPacketId == ushort.MaxValue)
- {
- this.lastPacketId = 0;
- }
-
- return Interlocked.Increment(ref this.lastPacketId);
- }
-
///
/// https://learn.microsoft.com/en-us/dotnet/api/system.idisposable?view=net-6.0
/// Dispose(bool disposing) executes in two distinct scenarios.
@@ -174,6 +174,8 @@ protected int GeneratePacketIdentifier()
/// True if called from user code.
protected virtual void Dispose(bool disposing)
{
+ Logger.Trace("Disposing HiveMQClient");
+
// Check to see if Dispose has already been called.
if (!this.disposed)
{
@@ -181,8 +183,18 @@ protected virtual void Dispose(bool disposing)
// and unmanaged resources.
if (disposing)
{
+ if (this.connectState == Internal.ConnectState.Connected)
+ {
+ Logger.Trace("HiveMQClient Dispose: Disconnecting connected client.");
+ _ = Task.Run(async () => await this.DisconnectAsync().ConfigureAwait(false));
+ }
+
// Dispose managed resources.
- // { }
+ this.sendQueue.CompleteAdding();
+ this.receivedQueue.CompleteAdding();
+
+ this.cancellationTokenSource.Cancel();
+ this.cancellationTokenSource.Dispose();
}
// Call the appropriate methods to clean up
diff --git a/Source/HiveMQtt/MQTT5/PacketDecoder.cs b/Source/HiveMQtt/MQTT5/PacketDecoder.cs
index 2d21b00e..127de369 100644
--- a/Source/HiveMQtt/MQTT5/PacketDecoder.cs
+++ b/Source/HiveMQtt/MQTT5/PacketDecoder.cs
@@ -54,7 +54,7 @@ public static bool TryDecode(ReadOnlySequence buffer, out ControlPacket de
if (buffer.Length < packetLength)
{
// Not all data for this packet has arrived yet. Try again...
- Logger.Trace("PacketDecoder.Decode: Not all data for this packet has arrived yet. Returning PartialPacket.");
+ Logger.Trace($"PacketDecoder.TryDecode: Waiting on more data: {buffer.Length} < {packetLength} - Returning PartialPacket.");
decodedPacket = new PartialPacket();
consumed = default;
return false;
diff --git a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs
index f7a32d1f..b70d9b3b 100644
--- a/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs
+++ b/Tests/HiveMQtt.Test/HiveMQClient/OptionsTest.cs
@@ -34,7 +34,7 @@ public void WithBadSessionExpiryInterval()
Assert.Equal(0, options.SessionExpiryInterval);
options.SessionExpiryInterval = long.MaxValue;
- options.ValidateOptions();
+ options.Validate();
Assert.Equal(uint.MaxValue, options.SessionExpiryInterval);
}