diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index 1f7b5440..eae79c81 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -82,12 +82,6 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(CM)- Canceled & exiting..."); - return; - } - // If connected and no recent traffic, send a ping if (this.ConnectState == ConnectState.Connected) { @@ -108,31 +102,37 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:....{this.IPubTransactionQueue.Count}/{this.IPubTransactionQueue.Capacity}"); Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:......{this.Subscriptions.Count}"); + // Background Tasks Health Check await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false); + // Sleep cycle await Task.Delay(2000, cancellationToken).ConfigureAwait(false); - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(CM)- Canceled & exiting...."); - return; + + // Check for cancellation + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(CM)- Canceled & exiting..."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(CM)- Exception: {ex}"); throw; } - - break; } } - Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); }, cancellationToken); /// @@ -147,12 +147,6 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled & existing with {this.OutgoingPublishQueue.Count} publish packets remaining."); - return; - } - while (this.ConnectState != ConnectState.Connected) { Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); @@ -184,45 +178,41 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); this.OnPublishSentEventLauncher(publishPacket); - if (writeResult.IsCanceled) + if (writeResult.IsCompleted || writeResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled (exiting)..."); - return; - } - - if (writeResult.IsCompleted) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter: IsCompleted={writeResult.IsCompleted} IsCancelled={writeResult.IsCanceled}"); if (this.ConnectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. - Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: this was unexpected"); + Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter: unexpected exit. Disconnecting..."); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - Logger.Info($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); break; } - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(PW)- Cancelled & exiting..."); - return; + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled & existing with {this.OutgoingPublishQueue.Count} publish packets remaining."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(PW)- Exception: {ex}"); throw; } - - break; } } // while(true) - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); }, cancellationToken); /// @@ -237,12 +227,6 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled & exiting with {this.SendQueue.Count} packets remaining."); - return; - } - // We allow this task to run in Connecting, Connected, and Disconnecting states // because it is the one that has to send the CONNECT and DISCONNECT packets. while (this.ConnectState == ConnectState.Disconnected) @@ -278,15 +262,12 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. writeResult = await this.WriteAsync(unsubscribePacket.Encode()).ConfigureAwait(false); this.OnUnsubscribeSentEventLauncher(unsubscribePacket); break; - case PublishPacket publishPacket: - throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); case PubAckPacket pubAckPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubAckPacket id={pubAckPacket.PacketIdentifier} reason={pubAckPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubAckPacket.Encode()).ConfigureAwait(false); this.HandleSentPubAckPacket(pubAckPacket); break; - case PubRecPacket pubRecPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PubRecPacket id={pubRecPacket.PacketIdentifier} reason={pubRecPacket.ReasonCode}"); writeResult = await this.WriteAsync(pubRecPacket.Encode()).ConfigureAwait(false); @@ -302,54 +283,52 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. writeResult = await this.WriteAsync(pubCompPacket.Encode()).ConfigureAwait(false); this.HandleSentPubCompPacket(pubCompPacket); break; + case PingReqPacket pingReqPacket: Logger.Trace($"{this.Options.ClientId}-(W)- --> Sending PingReqPacket id={pingReqPacket.PacketIdentifier}"); writeResult = await this.WriteAsync(PingReqPacket.Encode()).ConfigureAwait(false); this.OnPingReqSentEventLauncher(pingReqPacket); break; + + case PublishPacket publishPacket: + throw new HiveMQttClientException("PublishPacket should be sent via ConnectionPublishWriterAsync."); default: Logger.Trace($"{this.Options.ClientId}-(W)- --> Unknown packet type {packet}"); break; } // switch - if (writeResult.IsCanceled) - { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Write Cancelled. Exiting..."); - return; - } - - if (writeResult.IsCompleted) + if (writeResult.IsCompleted || writeResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: end of the stream. Exiting..."); + Logger.Debug($"{this.Options.ClientId}-(W)- ConnectionWriter exiting: IsCompleted={writeResult.IsCompleted} IsCancelled={writeResult.IsCanceled}"); if (this.ConnectState == ConnectState.Connected) { - // This is an unexpected exit and may be due to a network failure. - Logger.Debug($"{this.Options.ClientId}-(W)- ConnectionWriter IsCompleted: this was unexpected"); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - Logger.Info($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); - return; + break; + } + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled & exiting with {this.SendQueue.Count} packets remaining."); + break; } - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(W)- Cancelled & exiting..."); - return; } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(W)- Exception: {ex}"); throw; } - - break; } } // while(true) - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return; }, cancellationToken); @@ -367,31 +346,16 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); - return true; - } - readResult = await this.ReadAsync().ConfigureAwait(false); - if (readResult.IsCanceled) + if (readResult.IsCanceled || readResult.IsCompleted) { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled read result. Exiting..."); - return true; - } - - if (readResult.IsCompleted) - { - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: end of the stream"); + Logger.Debug($"{this.Options.ClientId}-(R)- ConnectionReader exiting: IsCompleted={readResult.IsCompleted} IsCancelled={readResult.IsCanceled}"); if (this.ConnectState == ConnectState.Connected) { - // This is an unexpected exit and may be due to a network failure. - Logger.Debug($"{this.Options.ClientId}-(R)- ConnectionReader IsCompleted: this was unexpected"); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - Logger.Info($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); return true; } @@ -403,13 +367,13 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => { if (decodedPacket is MalformedPacket) { - Logger.Warn($"Malformed packet received. Disconnecting."); + Logger.Error($"Malformed packet received. Disconnecting..."); Logger.Debug($"{this.Options.ClientId}-(R)- Malformed packet received: {decodedPacket}"); var opts = new DisconnectOptions { ReasonCode = DisconnectReasonCode.MalformedPacket, - ReasonString = "Malformed Packet", + ReasonString = "Client couldn't decode packet.", }; return await this.DisconnectAsync(opts).ConfigureAwait(false); } @@ -429,12 +393,26 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => // We handle disconnects immediately if (decodedPacket is DisconnectPacket disconnectPacket) { - Logger.Error($"--> Disconnect received <--: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); - await this.HandleDisconnectionAsync(false).ConfigureAwait(false); - this.OnDisconnectReceivedEventLauncher(disconnectPacket); + await this.HandleIncomingDisconnectPacketAsync(disconnectPacket).ConfigureAwait(false); break; } + // Check that maximum packet size has not been exceeded + if (this.Options.ClientMaximumPacketSize is not null && decodedPacket.PacketSize > this.Options.ClientMaximumPacketSize) + { + Logger.Error($"Received a packet that exceeds the requested maximum of {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {decodedPacket.PacketSize} for packet {decodedPacket.GetType().Name}"); + + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.PacketTooLarge, + ReasonString = "Packet size is larger than the requested Maximum Packet Size.", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + return false; + } + + // For QoS 1 and 2 publishes, potentially apply back pressure according to ReceiveMaximum if (decodedPacket is PublishPacket publishPacket) { // Limit the number of concurrent incoming QoS 1 and QoS 2 transactions @@ -447,38 +425,44 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => if (!success) { - Logger.Warn($"Duplicate packet ID detected {publishPacket.PacketIdentifier} while queueing to transaction queue for an incoming QoS {publishPacket.Message.QoS} publish ."); - - // FIXME: We should potentially disconnect here - continue; + Logger.Error($"Received a publish with a duplicate packet identifier {publishPacket.PacketIdentifier} for a transaction already in progress. Disconnecting."); + + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.UnspecifiedError, + ReasonString = "Client received a publish with duplicate packet identifier for a transaction already in progress.", + }; + return await this.DisconnectAsync(opts).ConfigureAwait(false); } } } Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); - - // Add the packet to the received queue for processing later by ReceivedPacketsHandlerAsync this.ReceivedQueue.Enqueue(decodedPacket); } // while (buffer.Length > 0 - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); - return true; + + // Check for cancellation + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(R)- Exception: {ex}"); throw; } - - break; } } // while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return true; }, cancellationToken); @@ -495,49 +479,12 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining. Exiting..."); - return; - } - var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - if (this.Options.ClientMaximumPacketSize != null) - { - // FIXME: Move this to ConnectionReaderAsync/closer to the source instead of after the queue - if (packet.PacketSize > this.Options.ClientMaximumPacketSize) - { - Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - - var opts = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.PacketTooLarge, - ReasonString = "Packet Too Large", - }; - await this.DisconnectAsync(opts).ConfigureAwait(false); - return; - } - } switch (packet) { case ConnAckPacket connAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); - if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) - { - Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); - - // Replace the OPubTransactionQueue BoundedDictionary with a new one with the broker's ReceiveMaximum - this.OPubTransactionQueue = new BoundedDictionaryX>((int)connAckPacket.Properties.ReceiveMaximum); - } - - this.ConnectionProperties = connAckPacket.Properties; - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); + this.HandleIncomingConnAckPacket(connAckPacket); break; case SubAckPacket subAckPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); @@ -547,6 +494,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); break; + case PublishPacket publishPacket: this.HandleIncomingPublishPacket(publishPacket); break; @@ -562,6 +510,12 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => case PubCompPacket pubCompPacket: this.HandleIncomingPubCompPacket(pubCompPacket); break; + + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case DisconnectPacket disconnectPacket: // Disconnects are handled immediate and shouldn't be received here // We leave this just as a sanity backup @@ -572,28 +526,63 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Logger.Error($"Unrecognized packet received. Will discard. {packet}"); break; } // switch (packet) - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(RPH)- Cancelled & exiting..."); - return; + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining. Exiting..."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(RPH)- Exception: {ex}"); throw; } - - break; } } // while (true) - Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return; }, cancellationToken); + /// + /// Handle an incoming ConnAck packet. + /// + /// The received ConnAck packet. + internal void HandleIncomingConnAckPacket(ConnAckPacket connAckPacket) + { + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) + { + Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); + + // FIXME: A resize would be better to not lose any existing. Can we send publishes before the CONNACK? + // Replace the OPubTransactionQueue BoundedDictionary with a new one with the broker's ReceiveMaximum + this.OPubTransactionQueue = new BoundedDictionaryX>((int)connAckPacket.Properties.ReceiveMaximum); + } + + this.ConnectionProperties = connAckPacket.Properties; + this.OnConnAckReceivedEventLauncher(connAckPacket); + } + + /// + /// Handle an incoming Disconnect packet. + /// + /// The received Disconnect packet. + /// A task that represents the asynchronous operation. + internal async Task HandleIncomingDisconnectPacketAsync(DisconnectPacket disconnectPacket) + { + Logger.Error($"--> Disconnect received <--: {disconnectPacket.DisconnectReasonCode} {disconnectPacket.Properties.ReasonString}"); + await this.HandleDisconnectionAsync(false).ConfigureAwait(false); + this.OnDisconnectReceivedEventLauncher(disconnectPacket); + } + /// /// Handle an incoming Publish packet. /// @@ -623,20 +612,27 @@ internal async void HandleIncomingPublishPacket(PublishPacket publishPacket) if (success) { // Update the chain in the queue - if (!this.IPubTransactionQueue.TryUpdate(publishPacket.PacketIdentifier, publishQoS1Chain, publishQoS1Chain)) + if (this.IPubTransactionQueue.TryUpdate(publishPacket.PacketIdentifier, publishQoS1Chain, publishQoS1Chain)) + { + this.SendQueue.Enqueue(pubAckResponse); + } + else { Logger.Error($"QoS1: Couldn't update Publish --> PubAck QoS1 Chain for packet identifier {publishPacket.PacketIdentifier}. Discarded."); this.IPubTransactionQueue.Remove(publishPacket.PacketIdentifier, out _); + + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.UnspecifiedError, + ReasonString = "Client internal error updating publish transaction chain.", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); } } else { - // FIXME: This should never happen if ConnectionReaderAsync is working correctly - Logger.Error($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); - return; + throw new HiveMQttClientException($"QoS1: Received Publish with an unknown packet identifier {publishPacket.PacketIdentifier}. Discarded."); } - - this.SendQueue.Enqueue(pubAckResponse); } else if (publishPacket.Message.QoS is QualityOfService.ExactlyOnceDelivery) { diff --git a/Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs b/Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs index 974ed16d..e74625c9 100644 --- a/Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs +++ b/Source/HiveMQtt/MQTT5/Packets/SubscribePacket.cs @@ -50,7 +50,6 @@ public SubscribePacket(SubscribeOptions options, ushort packetIdentifier, Dictio // // to wait for the subscribe transaction to complete. this.OnComplete += (sender, args) => this.OnCompleteTCS.SetResult(args.SubAckPacket); - } /// @@ -61,7 +60,6 @@ public SubscribePacket(SubscribeOptions options, ushort packetIdentifier, Dictio /// public override ControlPacketType ControlPacketType => ControlPacketType.Subscribe; - /// /// Valid for outgoing Subscribe packets. An event that is fired after the the subscribe transaction is complete. /// @@ -100,7 +98,6 @@ internal virtual void OnCompleteEventLauncher(SubAckPacket packet) /// public TaskCompletionSource OnCompleteTCS { get; } = new(); - /// /// Encode this packet to be sent on the wire. ///