Skip to content

Commit

Permalink
Subscribe: Better transaction management (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 13, 2024
1 parent 4cc7e20 commit 22d23eb
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
5 changes: 3 additions & 2 deletions Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,10 @@ public void Validate()
{
this.GenerateClientID();
}
else if (this.ClientId.Length > 23)

if (this.ClientId.Length > 23)

Check warning on line 289 in Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Dereference of a possibly null reference.

Check warning on line 289 in Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Dereference of a possibly null reference.

Check warning on line 289 in Source/HiveMQtt/Client/Options/HiveMQClientOptions.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x

Dereference of a possibly null reference.
{
// FIXME: Warn on exceeded length; can use but it may not work...
Logger.Info($"Client ID {this.ClientId} is longer than 23 characters. This may cause issues with some brokers.");
}
}

Expand Down
50 changes: 49 additions & 1 deletion Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
namespace HiveMQtt.MQTT5.Packets;

using System.IO;

using HiveMQtt.Client.Events;
using HiveMQtt.Client.Options;
using HiveMQtt.MQTT5.Types;

Expand All @@ -43,6 +43,14 @@ public SubscribePacket(SubscribeOptions options, ushort packetIdentifier, Dictio
{
this.Properties.UserProperties = userProperties;
}

// Setup the TaskCompletionSource so users can simply call
//
// await SubscribePacket.OnCompleteTCS
//
// to wait for the subscribe transaction to complete.
this.OnComplete += (sender, args) => this.OnCompleteTCS.SetResult(args.SubAckPacket);

}

Check warning on line 54 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Check warning on line 54 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Check warning on line 54 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x


/// <summary>
Expand All @@ -53,6 +61,46 @@ public SubscribePacket(SubscribeOptions options, ushort packetIdentifier, Dictio
/// <inheritdoc/>
public override ControlPacketType ControlPacketType => ControlPacketType.Subscribe;

Check warning on line 63 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Check warning on line 63 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Check warning on line 63 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x


/// <summary>
/// Valid for outgoing Subscribe packets. An event that is fired after the the subscribe transaction is complete.
/// </summary>
public event EventHandler<OnSubAckReceivedEventArgs> OnComplete = new((client, e) => { });

internal virtual void OnCompleteEventLauncher(SubAckPacket packet)
{
if (this.OnComplete != null && this.OnComplete.GetInvocationList().Length > 0)
{
var eventArgs = new OnSubAckReceivedEventArgs(packet);
Logger.Trace("SubscribePacket.OnCompleteEventLauncher");
_ = Task.Run(() => this.OnComplete?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
if (t.Exception is not null)
{
Logger.Error("SubscribePacket.OnCompleteEventLauncher exception: " + t.Exception.Message);
foreach (var ex in t.Exception.InnerExceptions)
{
Logger.Error("SubscribePacket.OnCompleteEventLauncher inner exception: " + ex.Message);
}
}
}
},
TaskScheduler.Default);
}
}

/// <summary>
/// Gets the awaitable TaskCompletionSource for the subscribe transaction.
/// <para>
/// Valid for outgoing subscribe packets. A TaskCompletionSource that is set when the subscribe transaction is complete.
/// </para>
/// </summary>
public TaskCompletionSource<SubAckPacket> OnCompleteTCS { get; } = new();

Check warning on line 102 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-6.0.x

Check warning on line 102 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-7.0.x

Check warning on line 102 in Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs

View workflow job for this annotation

GitHub Actions / pipeline-ubuntu-latest-dotnet-8.0.x


/// <summary>
/// Encode this packet to be sent on the wire.
/// </summary>
Expand Down

0 comments on commit 22d23eb

Please sign in to comment.