Skip to content

Commit

Permalink
Performance Improvements (#133)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Mar 19, 2024
1 parent 5c0dbfb commit 7e9680a
Show file tree
Hide file tree
Showing 9 changed files with 358 additions and 281 deletions.
35 changes: 22 additions & 13 deletions Benchmarks/ClientBenchmarkApp/ClientBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ namespace ClientBenchmarkApp;
using HiveMQtt.MQTT5.ReasonCodes;
using HiveMQtt.MQTT5.Types;

[SimpleJob(RunStrategy.Monitoring, iterationCount: 10, id: "MonitoringJob")]
[SimpleJob(RunStrategy.Monitoring, iterationCount: 100, id: "MonitoringJob")]
public class ClientBenchmarks : IDisposable
{
private readonly string smallPayload = new string(/*lang=json,strict*/ "{\"interference\": \"1029384\"}");

private HiveMQClient client;

public ClientBenchmarks()
{
Console.WriteLine("Starting HiveMQ client benchmarks...");
this.client = null!;
}

[GlobalSetup]
public async Task SetupAsync()
{
Expand All @@ -29,6 +35,7 @@ public async Task SetupAsync()
};

this.client = new HiveMQClient(options);

Console.WriteLine($"Connecting to {options.Host} on port {options.Port}...");
await this.client.ConnectAsync().ConfigureAwait(false);

Expand All @@ -50,22 +57,24 @@ public async Task CleanUpAsync()
}

[Benchmark(Description = "Publish a QoS 0 messages to the broker.")]
public async Task PublishQoS0MessageAsync()
{
await this.client.PublishAsync("benchmarks/PublishQoS0Messages", this.smallPayload).ConfigureAwait(false);
}
public async Task PublishQoS0MessageAsync() =>
await this.client.PublishAsync(
"benchmarks/PublishQoS0Messages",
this.smallPayload).ConfigureAwait(false);

[Benchmark(Description = "Publish a QoS 1 messages to the broker.")]
public async Task PublishQoS1MessageAsync()
{
await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);
}
public async Task PublishQoS1MessageAsync() =>
await this.client.PublishAsync(
"benchmarks/PublishQoS1Messages",
this.smallPayload,
QualityOfService.AtLeastOnceDelivery).ConfigureAwait(false);

[Benchmark(Description = "Publish a QoS 2 messages to the broker.")]
public async Task PublishQoS2MessageAsync()
{
await this.client.PublishAsync("benchmarks/PublishQoS1Messages", this.smallPayload, QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);
}
public async Task PublishQoS2MessageAsync() =>
await this.client.PublishAsync(
"benchmarks/PublishQoS2Messages",
this.smallPayload,
QualityOfService.ExactlyOnceDelivery).ConfigureAwait(false);

public void Dispose() => GC.SuppressFinalize(this);
}
31 changes: 23 additions & 8 deletions Documentation/docs/how-to/configure-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,29 @@ The HiveMQtt package uses [NLog](https://github.com/NLog/NLog) and can be config
Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package down to packet and event handling. Using this level will produce a lot of output such as the following:

```log
2023-10-04 16:56:54.9373|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher
2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|7: TrafficInflowProcessor Starting...Connecting
2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|9: TrafficOutflowProcessor Starting...Connecting
2023-10-04 16:56:55.0081|TRACE|HiveMQtt.Client.HiveMQClient|--> ConnectPacket
2023-10-04 16:56:55.0128|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher
2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|<-- ConnAck
2023-10-04 16:56:55.0374|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher
2023-10-04 16:56:55.0379|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher
2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend:
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler
2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883
2024-03-14 15:40:18.2343|TRACE|HiveMQtt.Client.HiveMQClient|BeforeConnectEventLauncher
2024-03-14 15:40:18.2460|TRACE|HiveMQtt.Client.HiveMQClient|Socket connected to 127.0.0.1:1883
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|Queuing packet for send: HiveMQtt.MQTT5.Packets.ConnectPacket
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- Starting...Connecting
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- ConnectionReader Starting...Connecting
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|5: ConnectionMonitor Starting...Connecting
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- 0 received packets currently waiting to be processed.
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter Starting...Connecting
2024-03-14 15:40:18.2464|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 1 packets waiting to be sent.
2024-03-14 15:40:18.2476|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- --> Sending ConnectPacket id=0
2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|OnConnectSentEventLauncher
2024-03-14 15:40:18.2529|TRACE|HiveMQtt.Client.HiveMQClient|-(W)- ConnectionWriter: 0 packets waiting to be sent.
2024-03-14 15:40:18.2732|TRACE|HiveMQtt.Client.HiveMQClient|ReadAsync: Read Buffer Length 11
2024-03-14 15:40:18.2765|TRACE|HiveMQtt.MQTT5.PacketDecoder|PacketDecoder: Decoded Packet: consumed=11, packet=HiveMQtt.MQTT5.Packets.ConnAckPacket id=0
2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(R)- <-- Received ConnAckPacket. Adding to receivedQueue.
2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|-(RPH)- <-- Received ConnAck id=0
2024-03-14 15:40:18.2765|TRACE|HiveMQtt.Client.HiveMQClient|OnConnAckReceivedEventLauncher
2024-03-14 15:40:18.2775|TRACE|HiveMQtt.Client.HiveMQClient|AfterConnectEventLauncher
```

## See Also
Expand Down
42 changes: 32 additions & 10 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,16 @@ public HiveMQClient(HiveMQClientOptions? options = null)
options ??= new HiveMQClientOptions();
options.Validate();

Logger.Trace($"New client created: Client ID: {options.ClientId}");

Logger.Trace("Trace Level Logging Legend:");
Logger.Trace(" -(W)- == ConnectionWriter");
Logger.Trace(" -(R)- == ConnectionReader");
Logger.Trace(" -(CM)- == ConnectionMonitor");
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");

this.Options = options;
this.cancellationSource = new CancellationTokenSource();
this.cancellationTokenSource = new CancellationTokenSource();
}

/// <inheritdoc />
Expand Down Expand Up @@ -80,7 +88,8 @@ public async Task<ConnectResult> ConnectAsync()

// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
this.sendQueue.Enqueue(connPacket);
Logger.Trace($"Queuing packet for send: {connPacket}");
this.sendQueue.Add(connPacket);

// FIXME: Cancellation token and better timeout value
ConnAckPacket connAck;
Expand Down Expand Up @@ -126,6 +135,7 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
{
if (this.connectState != ConnectState.Connected)
{
Logger.Warn("DisconnectAsync: Client is not connected.");
return false;
}

Expand All @@ -149,7 +159,8 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;

this.sendQueue.Enqueue(disconnectPacket);
Logger.Trace($"Queuing packet for send: {disconnectPacket}");
this.sendQueue.Add(disconnectPacket);

try
{
Expand All @@ -173,8 +184,17 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)

this.connectState = ConnectState.Disconnected;

// Clear the send queue
this.sendQueue.Clear();
// FIXME
if (this.sendQueue.Count > 0)
{
Logger.Warn("Disconnect: Send queue not empty. Packets pending but we are disconnecting.");
}

// We only clear the send queue on explicit disconnect
while (this.sendQueue.TryTake(out _))
{
}

return true;
}

Expand All @@ -189,7 +209,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
// QoS 0: Fast Service
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
this.sendQueue.Enqueue(publishPacket);
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.sendQueue.Add(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
Expand All @@ -201,7 +222,8 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
publishPacket.OnPublishQoS1Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
this.sendQueue.Enqueue(publishPacket);
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.sendQueue.Add(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);

Expand All @@ -217,7 +239,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
publishPacket.OnPublishQoS2Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
this.sendQueue.Enqueue(publishPacket);
this.sendQueue.Add(publishPacket);

// Wait on the QoS 2 handshake
var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
Expand Down Expand Up @@ -300,7 +322,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
this.OnSubAckReceived += eventHandler;

// Queue the constructed packet to be sent on the wire
this.sendQueue.Enqueue(subscribePacket);
this.sendQueue.Add(subscribePacket);

SubAckPacket subAck;
SubscribeResult subscribeResult;
Expand Down Expand Up @@ -404,7 +426,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;

this.sendQueue.Enqueue(unsubscribePacket);
this.sendQueue.Add(unsubscribePacket);

// FIXME: Cancellation token and better timeout value
UnsubAckPacket unsubAck;
Expand Down
2 changes: 1 addition & 1 deletion Source/HiveMQtt/Client/HiveMQClientOptionsBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public HiveMQClientOptionsBuilder WithClientCertificate(string clientCertificate
}
else
{
Logger.Error("File does not exist.");
Logger.Error("WithClientCertificate: The specified client certificate file does not exist.");
throw new FileNotFoundException();
}
}
Expand Down
33 changes: 14 additions & 19 deletions Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,13 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
private Stream? stream;
private PipeReader? reader;
private PipeWriter? writer;
private CancellationTokenSource cancellationSource;
private CancellationToken outFlowCancellationToken;
private CancellationToken inFlowCancellationToken;
private CancellationToken receivedPacketsCancellationToken;
private CancellationTokenSource cancellationTokenSource;

#pragma warning disable IDE0052
private Task? trafficOutflowProcessorTask;
private Task? trafficInflowProcessorTask;
private Task? receivedPacketsProcessorAsync;
private Task? connectionWriterTask;
private Task? connectionReaderTask;
private Task? receivedPacketsHandlerAsync;
private Task? connectionMonitorTask;
#pragma warning restore IDE0052

/// <summary>
Expand Down Expand Up @@ -173,18 +171,14 @@ internal async Task<bool> ConnectSocketAsync()
this.writer = PipeWriter.Create(this.stream);

// Reset the CancellationTokenSource in case this is a reconnect
this.cancellationSource.Dispose();
this.cancellationSource = new CancellationTokenSource();

// Setup the cancellation tokens
this.outFlowCancellationToken = this.cancellationSource.Token;
this.inFlowCancellationToken = this.cancellationSource.Token;
this.receivedPacketsCancellationToken = this.cancellationSource.Token;
this.cancellationTokenSource.Dispose();
this.cancellationTokenSource = new CancellationTokenSource();

// Start the traffic processors
this.trafficOutflowProcessorTask = this.TrafficOutflowProcessorAsync(this.outFlowCancellationToken);
this.trafficInflowProcessorTask = this.TrafficInflowProcessorAsync(this.inFlowCancellationToken);
this.receivedPacketsProcessorAsync = this.ReceivedPacketsProcessorAsync(this.receivedPacketsCancellationToken);
this.connectionWriterTask = this.ConnectionWriterAsync(this.cancellationTokenSource.Token);
this.connectionReaderTask = this.ConnectionReaderAsync(this.cancellationTokenSource.Token);
this.receivedPacketsHandlerAsync = this.ReceivedPacketsHandlerAsync(this.cancellationTokenSource.Token);
this.connectionMonitorTask = this.ConnectionMonitorAsync(this.cancellationTokenSource.Token);

Logger.Trace($"Socket connected to {this.socket.RemoteEndPoint}");
return socketConnected;
Expand Down Expand Up @@ -254,6 +248,9 @@ private async Task<bool> CreateTLSConnectionAsync(Stream stream)

internal bool CloseSocket(bool? shutdownPipeline = true)
{
// Cancel the background traffic processing tasks
this.cancellationTokenSource.Cancel();

if (shutdownPipeline == true)
{
// Shutdown the pipeline
Expand All @@ -265,8 +262,6 @@ internal bool CloseSocket(bool? shutdownPipeline = true)
this.socket?.Shutdown(SocketShutdown.Both);
this.socket?.Close();

this.cancellationSource.Cancel();

return true;
}
}
Loading

0 comments on commit 7e9680a

Please sign in to comment.