diff --git a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs index a91bf8e7..8f236078 100644 --- a/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs +++ b/Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs @@ -82,12 +82,6 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(CM)- Canceled & exiting..."); - return; - } - // If connected and no recent traffic, send a ping if (this.ConnectState == ConnectState.Connected) { @@ -108,31 +102,37 @@ private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task Logger.Debug($"{this.Options.ClientId}-(CM)- IPubTransactionQueue:....{this.IPubTransactionQueue.Count}/{this.IPubTransactionQueue.Capacity}"); Logger.Debug($"{this.Options.ClientId}-(CM)- # of Subscriptions:......{this.Subscriptions.Count}"); + // Background Tasks Health Check await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false); await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false); + // Sleep cycle await Task.Delay(2000, cancellationToken).ConfigureAwait(false); - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(CM)- Canceled & exiting...."); - return; + + // Check for cancellation + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(CM)- Canceled & exiting..."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(CM)- Exception: {ex}"); throw; } - - break; } } - Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); }, cancellationToken); /// @@ -147,12 +147,6 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled & existing with {this.OutgoingPublishQueue.Count} publish packets remaining."); - return; - } - while (this.ConnectState != ConnectState.Connected) { Logger.Trace($"{this.Options.ClientId}-(PW)- Not connected. Waiting for connect..."); @@ -184,45 +178,41 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) = writeResult = await this.WriteAsync(publishPacket.Encode()).ConfigureAwait(false); this.OnPublishSentEventLauncher(publishPacket); - if (writeResult.IsCanceled) - { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Write Cancelled (exiting)..."); - return; - } - - if (writeResult.IsCompleted) + if (writeResult.IsCompleted || writeResult.IsCanceled) { - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: end of the stream"); + Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter: IsCompleted={writeResult.IsCompleted} IsCancelled={writeResult.IsCanceled}"); if (this.ConnectState == ConnectState.Connected) { // This is an unexpected exit and may be due to a network failure. - Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter IsCompleted: this was unexpected"); + Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter: unexpected exit. Disconnecting..."); await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - Logger.Info($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); break; } - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(PW)- Cancelled & exiting..."); - return; + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(PW)- Cancelled & existing with {this.OutgoingPublishQueue.Count} publish packets remaining."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(PW)- Exception: {ex}"); throw; } - - break; } } // while(true) - Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); }, cancellationToken); /// @@ -237,12 +227,6 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled & exiting with {this.SendQueue.Count} packets remaining."); - return; - } - // We allow this task to run in Connecting, Connected, and Disconnecting states // because it is the one that has to send the CONNECT and DISCONNECT packets. while (this.ConnectState == ConnectState.Disconnected) @@ -321,27 +305,30 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task. await this.HandleDisconnectionAsync(false).ConfigureAwait(false); } - return; + break; + } + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(W)- Cancelled & exiting with {this.SendQueue.Count} packets remaining."); + break; } - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(W)- Cancelled & exiting..."); - return; } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(W)- Exception: {ex}"); throw; } - - break; } } // while(true) - Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return; }, cancellationToken); @@ -359,12 +346,6 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); - return true; - } - readResult = await this.ReadAsync().ConfigureAwait(false); if (readResult.IsCanceled || readResult.IsCompleted) @@ -418,6 +399,24 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => break; } + // Check that maximum packet size has not been exceeded + if (this.Options.ClientMaximumPacketSize != null) + { + if (decodedPacket.PacketSize > this.Options.ClientMaximumPacketSize) + { + Logger.Error($"Received a packet that exceeds the requested maximum of {this.Options.ClientMaximumPacketSize}. Disconnecting."); + Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {decodedPacket.PacketSize} for packet {decodedPacket.GetType().Name}"); + + var opts = new DisconnectOptions + { + ReasonCode = DisconnectReasonCode.PacketTooLarge, + ReasonString = "Packet size is larger than the requested Maximum Packet Size.", + }; + await this.DisconnectAsync(opts).ConfigureAwait(false); + return false; + } + } + // For QoS 1 and 2 publishes, potentially apply back pressure according to ReceiveMaximum if (decodedPacket is PublishPacket publishPacket) { @@ -445,27 +444,30 @@ private Task ConnectionReaderAsync(CancellationToken cancellationToken) => Logger.Trace($"{this.Options.ClientId}-(R)- <-- Received {decodedPacket.GetType().Name} id: {decodedPacket.PacketIdentifier}. Adding to receivedQueue."); this.ReceivedQueue.Enqueue(decodedPacket); - } // while (buffer.Length > 0 - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); - return true; + + // Check for cancellation + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(R)- Cancelled & exiting..."); + break; + } } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(R)- Exception: {ex}"); throw; } - - break; } } // while (this.ConnectState is ConnectState.Connecting or ConnectState.Connected) - Logger.Trace($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(R)- ConnectionReader Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return true; }, cancellationToken); @@ -482,49 +484,12 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => { try { - if (cancellationToken.IsCancellationRequested) - { - Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining. Exiting..."); - return; - } - var packet = await this.ReceivedQueue.DequeueAsync(cancellationToken).ConfigureAwait(false); - if (this.Options.ClientMaximumPacketSize != null) - { - // FIXME: Move this to ConnectionReaderAsync/closer to the source instead of after the queue - if (packet.PacketSize > this.Options.ClientMaximumPacketSize) - { - Logger.Warn($"Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - Logger.Debug($"{this.Options.ClientId}-(RPH)- Received packet size {packet.PacketSize} exceeds maximum packet size {this.Options.ClientMaximumPacketSize}. Disconnecting."); - - var opts = new DisconnectOptions - { - ReasonCode = DisconnectReasonCode.PacketTooLarge, - ReasonString = "Packet Too Large", - }; - await this.DisconnectAsync(opts).ConfigureAwait(false); - return; - } - } switch (packet) { case ConnAckPacket connAckPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); - if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) - { - Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); - - // Replace the OPubTransactionQueue BoundedDictionary with a new one with the broker's ReceiveMaximum - this.OPubTransactionQueue = new BoundedDictionaryX>((int)connAckPacket.Properties.ReceiveMaximum); - } - - this.ConnectionProperties = connAckPacket.Properties; - this.OnConnAckReceivedEventLauncher(connAckPacket); - break; - case PingRespPacket pingRespPacket: - Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); - this.OnPingRespReceivedEventLauncher(pingRespPacket); + this.HandleIncomingConnAckPacket(connAckPacket); break; case SubAckPacket subAckPacket: Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received SubAck id={subAckPacket.PacketIdentifier}"); @@ -534,6 +499,7 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received UnsubAck id={unsubAckPacket.PacketIdentifier}"); this.OnUnsubAckReceivedEventLauncher(unsubAckPacket); break; + case PublishPacket publishPacket: this.HandleIncomingPublishPacket(publishPacket); break; @@ -549,6 +515,12 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => case PubCompPacket pubCompPacket: this.HandleIncomingPubCompPacket(pubCompPacket); break; + + case PingRespPacket pingRespPacket: + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received PingResp id={pingRespPacket.PacketIdentifier}"); + this.OnPingRespReceivedEventLauncher(pingRespPacket); + break; + case DisconnectPacket disconnectPacket: // Disconnects are handled immediate and shouldn't be received here // We leave this just as a sanity backup @@ -559,28 +531,53 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Logger.Error($"Unrecognized packet received. Will discard. {packet}"); break; } // switch (packet) - } - catch (TaskCanceledException) - { - Logger.Debug($"{this.Options.ClientId}-(RPH)- Cancelled & exiting..."); - return; + + if (cancellationToken.IsCancellationRequested) + { + Logger.Trace($"{this.Options.ClientId}-(RPH)- Cancelled with {this.ReceivedQueue.Count} received packets remaining. Exiting..."); + break; + } + } catch (Exception ex) { - if (!cancellationToken.IsCancellationRequested) + if (ex is TaskCanceledException || cancellationToken.IsCancellationRequested) + { + break; + } + else { Logger.Error($"{this.Options.ClientId}-(RPH)- Exception: {ex}"); throw; } - - break; } } // while (true) - Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}"); + Logger.Debug($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}, cancellationRequested={cancellationToken.IsCancellationRequested}"); return; }, cancellationToken); + + /// + /// Handle an incoming ConnAck packet. + /// + /// The received ConnAck packet. + internal void HandleIncomingConnAckPacket(ConnAckPacket connAckPacket) + { + Logger.Trace($"{this.Options.ClientId}-(RPH)- <-- Received ConnAck id={connAckPacket.PacketIdentifier}"); + if (connAckPacket.ReasonCode == ConnAckReasonCode.Success && connAckPacket.Properties.ReceiveMaximum != null) + { + Logger.Debug($"{this.Options.ClientId}-(RPH)- <-- Broker says limit concurrent incoming QoS 1 and QoS 2 publishes to {connAckPacket.Properties.ReceiveMaximum}."); + + // FIXME: A resize would be better to not lose any existing. Can we send publishes before the CONNACK? + // Replace the OPubTransactionQueue BoundedDictionary with a new one with the broker's ReceiveMaximum + this.OPubTransactionQueue = new BoundedDictionaryX>((int)connAckPacket.Properties.ReceiveMaximum); + } + + this.ConnectionProperties = connAckPacket.Properties; + this.OnConnAckReceivedEventLauncher(connAckPacket); + } + /// /// Handle an incoming Publish packet. ///