Skip to content

Commit

Permalink
Move TaskCompletionSource down to PublishPacket class
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo committed May 24, 2024
1 parent 635ef37 commit d51a86e
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 24 deletions.
21 changes: 3 additions & 18 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -207,37 +207,25 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
{
// QoS 1: Acknowledged Delivery
var taskCompletionSource = new TaskCompletionSource<PubAckPacket>();
void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket);
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

publishPacket.OnPublishQoS1Complete -= eventHandler;
// Wait on the QoS 1 handshake
var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
return new PublishResult(publishPacket.Message, pubAckPacket);
}
else if (message.QoS == QualityOfService.ExactlyOnceDelivery)
{
// QoS 2: Assured Delivery
PublishResult? publishResult = null;
var taskCompletionSource = new TaskCompletionSource<List<ControlPacket>>();
void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList);
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
try
{
// Wait on the QoS 2 handshake
// FIXME: Timeout value
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand All @@ -254,7 +242,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
{
QoS2ReasonCode = null,
};
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}

Expand All @@ -271,8 +258,6 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received.");
}

// Remove our wait handler
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}

Expand Down
64 changes: 58 additions & 6 deletions Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ public PublishPacket(MQTT5PublishMessage message, int packetIdentifier)
{
this.PacketIdentifier = (ushort)packetIdentifier;
this.Message = message;

// Setup the QoS 1 TaskCompletionSource so users can simply call
//
// await PublishPacket.OnPublishQoS1CompleteTCS
//
// to wait for the QoS 1 publish to complete.
this.OnPublishQoS1Complete += (sender, args) => this.OnPublishQoS1CompleteTCS.SetResult(args.PubAckPacket);

// Setup the QoS 2 TaskCompletionSource so users can simply call
//
// await PublishPacket.OnPublishQoS2CompleteTCS
//
// to wait for the QoS 2 publish to complete.
this.OnPublishQoS2Complete += (sender, args) => this.OnPublishQoS2CompleteTCS.SetResult(args.PacketList);
}

/// <summary>
Expand Down Expand Up @@ -66,23 +80,61 @@ public PublishPacket(ReadOnlySequence<byte> packetData)

internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet)
{
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
this.OnPublishQoS1Complete?.Invoke(this, eventArgs);
if (this.OnPublishQoS1Complete != null && this.OnPublishQoS1Complete.GetInvocationList().Length > 0)
{
var eventArgs = new OnPublishQoS1CompleteEventArgs(packet);
Logger.Trace("OnPublishQoS1CompleteEventLauncher");
_ = Task.Run(() => this.OnPublishQoS1Complete?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception.Message);
}
},
TaskScheduler.Default);
}
}

/// <summary>
/// Gets the awaitable TaskCompletionSource for the QoS 1 publish transaction.
/// <para>
/// Valid for outgoing Publish messages QoS 1. A TaskCompletionSource that is set when the QoS 1 publish transaction is complete.
/// </para>
/// </summary>
public TaskCompletionSource<PubAckPacket> OnPublishQoS1CompleteTCS { get; } = new();

/// <summary>
/// Valid for outgoing Publish messages QoS 2. An event that is fired after the the QoS 2 PubComp is received.
/// </summary>
public event EventHandler<OnPublishQoS2CompleteEventArgs> OnPublishQoS2Complete = new((client, e) => { });

internal virtual void OnPublishQoS2CompleteEventLauncher(List<ControlPacket> packetList)
{
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
this.OnPublishQoS2Complete?.Invoke(this, eventArgs);
if (this.OnPublishQoS2Complete != null && this.OnPublishQoS2Complete.GetInvocationList().Length > 0)
{
var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList);
Logger.Trace("OnPublishQoS2CompleteEventLauncher");
_ = Task.Run(() => this.OnPublishQoS2Complete?.Invoke(this, eventArgs)).ContinueWith(
t =>
{
if (t.IsFaulted)
{
Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception.Message);
}
},
TaskScheduler.Default);
}
}

/// <summary>
/// Gets the awaitable TaskCompletionSource for the QoS 2 publish transaction.
/// <para>
/// Valid for outgoing Publish messages QoS 2. A TaskCompletionSource that is set when the QoS 2 publish transaction is complete.
/// </para>
/// </summary>
public TaskCompletionSource<List<ControlPacket>> OnPublishQoS2CompleteTCS { get; } = new();

/// <summary>
/// Decode the received MQTT Publish packet.
/// </summary>
Expand Down

0 comments on commit d51a86e

Please sign in to comment.