Skip to content

Commit

Permalink
Client Stability & Test Suite Improvements (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Mar 21, 2024
1 parent 3e360f9 commit e390c57
Show file tree
Hide file tree
Showing 13 changed files with 352 additions and 151 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ jobs:
- name: "Dotnet Cake Build"
run: dotnet cake --target=Build
shell: pwsh
# - name: "Dotnet Cake Test"
# run: dotnet cake --target=Test
# shell: pwsh
- name: "Dotnet Cake Test"
run: dotnet cake --target=Test
shell: pwsh
- name: "Dotnet Cake Pack"
run: dotnet cake --target=Pack
shell: pwsh
Expand Down
28 changes: 28 additions & 0 deletions Benchmarks/ClientBenchmarkApp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,31 @@
The benchmarks are built with [BenchmarkDotNet](https://benchmarkdotnet.org) and can be run with:

`dotnet run ClientBenchmarkApp.csproj -c Release`

# Results - Mar 21, 2024

With release [v0.11.0](https://github.com/hivemq/hivemq-mqtt-client-dotnet/releases/tag/v0.11.0) there was a big performance improvement. All messaging performance was improved but particularly publishing a QoS level 2 message went from ~206ms down to ~1.6ms.

## Previous Performance

| Method | Mean | Error | StdDev | Median |
|------------------------------------------ |-------------:|------------:|------------:|---------------:|
| 'Publish a QoS 0 message' | 390.8 us | 1,842.5 us | 1,218.7 us | 5.646 us |
| 'Publish a QoS 1 message' | 103,722.8 us | 4,330.0 us | 2,864.1 us | 103,536.375 us |
| 'Publish a QoS 2 message' | 202,367.9 us | 26,562.9 us | 17,569.7 us | 206,959.834 us |

## First Pass Refactor Performance

| Method | Mean | Error | StdDev | Median |
|------------------------------------------ |-----------:|-----------:|-----------:|-------------:|
| 'Publish a QoS 0 message' | 401.9 us | 1,876.3 us | 1,241.0 us | 9.250 us |
| 'Publish a QoS 1 message' | 2,140.0 us | 3,568.2 us | 2,360.1 us | 1,324.251 us |
| 'Publish a QoS 2 message' | 4,217.2 us | 5,803.7 us | 3,838.8 us | 2,569.166 us |

## Final Refactor Performance Results (for now 👻)

| Method | Mean | Error | StdDev | Median |
|------------------------------------------ |------------:|----------:|------------:|-------------:|
| 'Publish a QoS 0 message' | 47.11 us | 139.47 us | 411.23 us | 4.875 us |
| 'Publish a QoS 1 message' | 1,210.71 us | 508.64 us | 1,499.75 us | 790.645 us |
| 'Publish a QoS 2 message' | 2,080.46 us | 591.38 us | 1,743.71 us | 1,653.083 us |
57 changes: 34 additions & 23 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient
{
private static readonly NLog.Logger Logger = NLog.LogManager.GetCurrentClassLogger();

private ConnectState connectState = ConnectState.Disconnected;
internal ConnectState ConnectState { get; set; }

public HiveMQClient(HiveMQClientOptions? options = null)
{
this.ConnectState = ConnectState.Disconnected;

options ??= new HiveMQClientOptions();
options.Validate();

Expand All @@ -66,12 +68,12 @@ public HiveMQClient(HiveMQClientOptions? options = null)
public List<Subscription> Subscriptions { get; } = new();

/// <inheritdoc />
public bool IsConnected() => this.connectState == ConnectState.Connected;
public bool IsConnected() => this.ConnectState == ConnectState.Connected;

/// <inheritdoc />
public async Task<ConnectResult> ConnectAsync()
{
this.connectState = ConnectState.Connecting;
this.ConnectState = ConnectState.Connecting;

Logger.Info("Connecting to broker at {0}:{1}", this.Options.Host, this.Options.Port);

Expand All @@ -89,7 +91,7 @@ public async Task<ConnectResult> ConnectAsync()
// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
Logger.Trace($"Queuing packet for send: {connPacket}");
this.sendQueue.Add(connPacket);
this.SendQueue.Add(connPacket);

// FIXME: Cancellation token and better timeout value
ConnAckPacket connAck;
Expand All @@ -100,7 +102,7 @@ public async Task<ConnectResult> ConnectAsync()
}
catch (TimeoutException)
{
this.connectState = ConnectState.Disconnected;
this.ConnectState = ConnectState.Disconnected;
throw new HiveMQttClientException("Connect timeout. No response received in time.");
}
finally
Expand All @@ -111,11 +113,11 @@ public async Task<ConnectResult> ConnectAsync()

if (connAck.ReasonCode == ConnAckReasonCode.Success)
{
this.connectState = ConnectState.Connected;
this.ConnectState = ConnectState.Connected;
}
else
{
this.connectState = ConnectState.Disconnected;
this.ConnectState = ConnectState.Disconnected;
}

connectResult = new ConnectResult(connAck.ReasonCode, connAck.SessionPresent, connAck.Properties);
Expand All @@ -133,7 +135,7 @@ public async Task<ConnectResult> ConnectAsync()
/// <inheritdoc />
public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
{
if (this.connectState != ConnectState.Connected)
if (this.ConnectState != ConnectState.Connected)
{
Logger.Warn("DisconnectAsync: Client is not connected.");
return false;
Expand All @@ -152,15 +154,15 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
};

// Once this is set, no more incoming packets or outgoing will be accepted
this.connectState = ConnectState.Disconnecting;
this.ConnectState = ConnectState.Disconnecting;

var taskCompletionSource = new TaskCompletionSource<DisconnectPacket>();
void TaskHandler(object? sender, OnDisconnectSentEventArgs args) => taskCompletionSource.SetResult(args.DisconnectPacket);
EventHandler<OnDisconnectSentEventArgs> eventHandler = TaskHandler;
this.OnDisconnectSent += eventHandler;

Logger.Trace($"Queuing packet for send: {disconnectPacket}");
this.sendQueue.Add(disconnectPacket);
this.SendQueue.Add(disconnectPacket);

try
{
Expand All @@ -176,26 +178,35 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
this.OnDisconnectSent -= eventHandler;
}

// Close the socket
this.HandleDisconnection();

return true;
}

/// <summary>
/// Close the socket and set the connect state to disconnected.
/// </summary>
private void HandleDisconnection()
{
Logger.Debug("HandleDisconnection: Connection lost. Handling Disconnection.");

this.CloseSocket();

// Fire the corresponding event
this.AfterDisconnectEventLauncher(true);
this.AfterDisconnectEventLauncher(false);

this.connectState = ConnectState.Disconnected;
this.ConnectState = ConnectState.Disconnected;

// FIXME
if (this.sendQueue.Count > 0)
if (this.SendQueue.Count > 0)
{
Logger.Warn("Disconnect: Send queue not empty. Packets pending but we are disconnecting.");
Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting (or were disconnected).");
}

// We only clear the send queue on explicit disconnect
while (this.sendQueue.TryTake(out _))
while (this.SendQueue.TryTake(out _))
{
}

return true;
}

/// <inheritdoc />
Expand All @@ -210,7 +221,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.sendQueue.Add(publishPacket);
this.SendQueue.Add(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
Expand All @@ -223,7 +234,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)

// Construct the MQTT Connect packet and queue to send
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.sendQueue.Add(publishPacket);
this.SendQueue.Add(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);

Expand All @@ -239,7 +250,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
publishPacket.OnPublishQoS2Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
this.sendQueue.Add(publishPacket);
this.SendQueue.Add(publishPacket);

// Wait on the QoS 2 handshake
var packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
Expand Down Expand Up @@ -322,7 +333,7 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
this.OnSubAckReceived += eventHandler;

// Queue the constructed packet to be sent on the wire
this.sendQueue.Add(subscribePacket);
this.SendQueue.Add(subscribePacket);

SubAckPacket subAck;
SubscribeResult subscribeResult;
Expand Down Expand Up @@ -426,7 +437,7 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;

this.sendQueue.Add(unsubscribePacket);
this.SendQueue.Add(unsubscribePacket);

// FIXME: Cancellation token and better timeout value
UnsubAckPacket unsubAck;
Expand Down
Loading

0 comments on commit e390c57

Please sign in to comment.