Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved Wait Strategy for Publish Transactions #164

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,37 +207,25 @@
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
{
// QoS 1: Acknowledged Delivery
var taskCompletionSource = new TaskCompletionSource<PubAckPacket>();
void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket);
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

publishPacket.OnPublishQoS1Complete -= eventHandler;
// Wait on the QoS 1 handshake
var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
return new PublishResult(publishPacket.Message, pubAckPacket);
}
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
{
// QoS 2: Assured Delivery
PublishResult? publishResult = null;
var taskCompletionSource = new TaskCompletionSource<List<ControlPacket>>();
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList);
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
try
{
// Wait on the QoS 2 handshake
// FIXME: Timeout value
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand All @@ -254,7 +242,6 @@
{
QoS2ReasonCode = null,
};
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}

Expand All @@ -271,8 +258,6 @@
throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received.");
}

// Remove our wait handler
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}

Expand Down Expand Up @@ -497,13 +482,13 @@
/// Close the socket and set the connect state to disconnected.
/// </summary>
/// <param name="clean">Indicates whether the disconnect was intended or not.</param>
private async Task<bool> HandleDisconnectionAsync(bool clean = true)

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.

Check warning on line 485 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

This async method lacks 'await' operators and will run synchronously. Consider using the 'await' operator to await non-blocking API calls, or 'await Task.Run(...)' to do CPU-bound work on a background thread.
{
Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}.");

// Cancel all background tasks and close the socket
this.ConnectState = ConnectState.Disconnected;
this.cancellationTokenSource.Cancel();

Check warning on line 491 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)

Check warning on line 491 in Source/HiveMQtt/Client/HiveMQClient.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Cancel synchronously blocks. Await CancelAsync instead. (https://github.com/Microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD103.md)
this.CloseSocket();

if (clean)
Expand Down
64 changes: 58 additions & 6 deletions Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@
{
this.PacketIdentifier = (ushort)packetIdentifier;
this.Message = message;

// Setup the QoS 1 TaskCompletionSource so users can simply call
//
// await PublishPacket.OnPublishQoS1CompleteTCS
//
// to wait for the QoS 1 publish to complete.
this.OnPublishQoS1Complete += (sender, args) => this.OnPublishQoS1CompleteTCS.SetResult(args.PubAckPacket);

// Setup the QoS 2 TaskCompletionSource so users can simply call
//
// await PublishPacket.OnPublishQoS2CompleteTCS
//
// to wait for the QoS 2 publish to complete.
this.OnPublishQoS2Complete += (sender, args) => this.OnPublishQoS2CompleteTCS.SetResult(args.PacketList);
}

/// <summary>
Expand Down Expand Up @@ -66,23 +80,61 @@

internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet)
{
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
this.OnPublishQoS1Complete?.Invoke(this, eventArgs);
if (this.OnPublishQoS1Complete != null && this.OnPublishQoS1Complete.GetInvocationList().Length > 0)
{
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
_ = Task.Run(() => this.OnPublishQoS1Complete?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception.Message);

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Dereference of a possibly null reference.

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Dereference of a possibly null reference.

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Dereference of a possibly null reference.

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Dereference of a possibly null reference.

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Dereference of a possibly null reference.

Check warning on line 92 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Dereference of a possibly null reference.
}
},
TaskScheduler.Default);
}
}

/// <summary>
/// Gets the awaitable TaskCompletionSource for the QoS 1 publish transaction.
/// <para>
/// Valid for outgoing Publish messages QoS 1. A TaskCompletionSource that is set when the QoS 1 publish transaction is complete.
/// </para>
/// </summary>
public TaskCompletionSource<PubAckPacket> OnPublishQoS1CompleteTCS { get; } = new();

/// <summary>
/// Valid for outgoing Publish messages QoS 2. An event that is fired after the the QoS 2 PubComp is received.
/// </summary>
public event EventHandler<OnPublishQoS2CompleteEventArgs> OnPublishQoS2Complete = new((client, e) => { });

internal virtual void OnPublishQoS2CompleteEventLauncher(List<ControlPacket> packetList)
{
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
this.OnPublishQoS2Complete?.Invoke(this, eventArgs);
if (this.OnPublishQoS2Complete != null && this.OnPublishQoS2Complete.GetInvocationList().Length > 0)
{
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
_ = Task.Run(() => this.OnPublishQoS2Complete?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception.Message);

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Dereference of a possibly null reference.

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Dereference of a possibly null reference.

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Dereference of a possibly null reference.

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Dereference of a possibly null reference.

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Dereference of a possibly null reference.

Check warning on line 123 in Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Dereference of a possibly null reference.
}
},
TaskScheduler.Default);
}
}

/// <summary>
/// Gets the awaitable TaskCompletionSource for the QoS 2 publish transaction.
/// <para>
/// Valid for outgoing Publish messages QoS 2. A TaskCompletionSource that is set when the QoS 2 publish transaction is complete.
/// </para>
/// </summary>
public TaskCompletionSource<List<ControlPacket>> OnPublishQoS2CompleteTCS { get; } = new();

/// <summary>
/// Decode the received MQTT Publish packet.
/// </summary>
Expand Down
Loading