diff --git a/Source/HiveMQtt/Client/HiveMQClient.cs b/Source/HiveMQtt/Client/HiveMQClient.cs index 9542b751..20685208 100644 --- a/Source/HiveMQtt/Client/HiveMQClient.cs +++ b/Source/HiveMQtt/Client/HiveMQClient.cs @@ -207,28 +207,17 @@ public async Task PublishAsync(MQTT5PublishMessage message) else if (message.QoS == QualityOfService.AtLeastOnceDelivery) { // QoS 1: Acknowledged Delivery - var taskCompletionSource = new TaskCompletionSource(); - void TaskHandler(object? sender, OnPublishQoS1CompleteEventArgs args) => taskCompletionSource.SetResult(args.PubAckPacket); - EventHandler eventHandler = TaskHandler; - publishPacket.OnPublishQoS1Complete += eventHandler; - Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); - var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); - - publishPacket.OnPublishQoS1Complete -= eventHandler; + // Wait on the QoS 1 handshake + var pubAckPacket = await publishPacket.OnPublishQoS1CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); return new PublishResult(publishPacket.Message, pubAckPacket); } else if (message.QoS == QualityOfService.ExactlyOnceDelivery) { // QoS 2: Assured Delivery PublishResult? publishResult = null; - var taskCompletionSource = new TaskCompletionSource>(); - void TaskHandler(object? sender, OnPublishQoS2CompleteEventArgs args) => taskCompletionSource.SetResult(args.PacketList); - EventHandler eventHandler = TaskHandler; - publishPacket.OnPublishQoS2Complete += eventHandler; - Logger.Trace($"Queuing packet for send: {publishPacket.GetType().Name} id={publishPacket.PacketIdentifier}"); this.OutgoingPublishQueue.Enqueue(publishPacket); @@ -236,8 +225,7 @@ public async Task PublishAsync(MQTT5PublishMessage message) try { // Wait on the QoS 2 handshake - // FIXME: Timeout value - packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); + packetList = await publishPacket.OnPublishQoS2CompleteTCS.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false); } catch (TimeoutException) { @@ -254,7 +242,6 @@ public async Task PublishAsync(MQTT5PublishMessage message) { QoS2ReasonCode = null, }; - publishPacket.OnPublishQoS2Complete -= eventHandler; return publishResult; } @@ -271,8 +258,6 @@ public async Task PublishAsync(MQTT5PublishMessage message) throw new HiveMQttClientException("PublishAsync: QoS 2 complete but no PubRec packet received."); } - // Remove our wait handler - publishPacket.OnPublishQoS2Complete -= eventHandler; return publishResult; } diff --git a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs index d91e18da..a8bc3882 100644 --- a/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/PublishPacket.cs @@ -39,6 +39,20 @@ public PublishPacket(MQTT5PublishMessage message, int packetIdentifier) { this.PacketIdentifier = (ushort)packetIdentifier; this.Message = message; + + // Setup the QoS 1 TaskCompletionSource so users can simply call + // + // await PublishPacket.OnPublishQoS1CompleteTCS + // + // to wait for the QoS 1 publish to complete. + this.OnPublishQoS1Complete += (sender, args) => this.OnPublishQoS1CompleteTCS.SetResult(args.PubAckPacket); + + // Setup the QoS 2 TaskCompletionSource so users can simply call + // + // await PublishPacket.OnPublishQoS2CompleteTCS + // + // to wait for the QoS 2 publish to complete. + this.OnPublishQoS2Complete += (sender, args) => this.OnPublishQoS2CompleteTCS.SetResult(args.PacketList); } /// @@ -66,11 +80,30 @@ public PublishPacket(ReadOnlySequence packetData) internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet) { - var eventArgs = new OnPublishQoS1CompleteEventArgs(packet); - Logger.Trace("OnPublishQoS1CompleteEventLauncher"); - this.OnPublishQoS1Complete?.Invoke(this, eventArgs); + if (this.OnPublishQoS1Complete != null && this.OnPublishQoS1Complete.GetInvocationList().Length > 0) + { + var eventArgs = new OnPublishQoS1CompleteEventArgs(packet); + Logger.Trace("OnPublishQoS1CompleteEventLauncher"); + _ = Task.Run(() => this.OnPublishQoS1Complete?.Invoke(this, eventArgs)).ContinueWith( + t => + { + if (t.IsFaulted) + { + Logger.Error("OnPublishQoS1CompleteEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } + /// + /// Gets the awaitable TaskCompletionSource for the QoS 1 publish transaction. + /// + /// Valid for outgoing Publish messages QoS 1. A TaskCompletionSource that is set when the QoS 1 publish transaction is complete. + /// + /// + public TaskCompletionSource OnPublishQoS1CompleteTCS { get; } = new(); + /// /// Valid for outgoing Publish messages QoS 2. An event that is fired after the the QoS 2 PubComp is received. /// @@ -78,11 +111,30 @@ internal virtual void OnPublishQoS1CompleteEventLauncher(PubAckPacket packet) internal virtual void OnPublishQoS2CompleteEventLauncher(List packetList) { - var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList); - Logger.Trace("OnPublishQoS2CompleteEventLauncher"); - this.OnPublishQoS2Complete?.Invoke(this, eventArgs); + if (this.OnPublishQoS2Complete != null && this.OnPublishQoS2Complete.GetInvocationList().Length > 0) + { + var eventArgs = new OnPublishQoS2CompleteEventArgs(packetList); + Logger.Trace("OnPublishQoS2CompleteEventLauncher"); + _ = Task.Run(() => this.OnPublishQoS2Complete?.Invoke(this, eventArgs)).ContinueWith( + t => + { + if (t.IsFaulted) + { + Logger.Error("OnPublishQoS2CompleteEventLauncher exception: " + t.Exception.Message); + } + }, + TaskScheduler.Default); + } } + /// + /// Gets the awaitable TaskCompletionSource for the QoS 2 publish transaction. + /// + /// Valid for outgoing Publish messages QoS 2. A TaskCompletionSource that is set when the QoS 2 publish transaction is complete. + /// + /// + public TaskCompletionSource> OnPublishQoS2CompleteTCS { get; } = new(); + /// /// Decode the received MQTT Publish packet. ///