Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Handling & QoS 2 Improvements #157

Merged
merged 13 commits into from
May 16, 2024
1 change: 1 addition & 0 deletions Documentation/docs/how-to/configure-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package d
```log
2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend:
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(PW)- == ConnectionPublishWriter
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler
2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883
Expand Down
7 changes: 0 additions & 7 deletions Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,4 @@ public HiveMQttClientException(string message, Exception inner)
: base(message, inner)
{
}

protected HiveMQttClientException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context)
: base(info, context)
{
}
}
53 changes: 34 additions & 19 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public HiveMQClient(HiveMQClientOptions? options = null)

Logger.Trace("Trace Level Logging Legend:");
Logger.Trace(" -(W)- == ConnectionWriter");
Logger.Trace(" -(PW)- == ConnectionPublishWriter");
Logger.Trace(" -(R)- == ConnectionReader");
Logger.Trace(" -(CM)- == ConnectionMonitor");
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
Expand Down Expand Up @@ -91,7 +92,7 @@ public async Task<ConnectResult> ConnectAsync()
// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
Logger.Trace($"Queuing packet for send: {connPacket}");
this.SendQueue.Add(connPacket);
this.SendQueue.Enqueue(connPacket);

ConnAckPacket connAck;
ConnectResult connectResult;
Expand Down Expand Up @@ -162,11 +163,11 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
this.OnDisconnectSent += eventHandler;

Logger.Trace($"Queuing packet for send: {disconnectPacket}");
this.SendQueue.Add(disconnectPacket);
this.SendQueue.Enqueue(disconnectPacket);

try
{
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -200,7 +201,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.SendQueue.Add(publishPacket);
this.OutgoingPublishQueue.Enqueue(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
Expand All @@ -211,11 +212,10 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.SendQueue.Add(publishPacket);
this.OutgoingPublishQueue.Enqueue(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

publishPacket.OnPublishQoS1Complete -= eventHandler;
return new PublishResult(publishPacket.Message, pubAckPacket);
Expand All @@ -229,20 +229,31 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
this.SendQueue.Add(publishPacket);
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
try
{
// Wait on the QoS 2 handshake
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
// FIXME: Timeout value
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");
publishResult = new PublishResult(publishPacket.Message);
publishResult.QoS2ReasonCode = null;

// Remove the transaction chain
if (this.transactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
{
Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}.");
}

// Prepare PublishResult
publishResult = new PublishResult(publishPacket.Message)
{
QoS2ReasonCode = null,
};
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}
Expand Down Expand Up @@ -329,13 +340,13 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
this.OnSubAckReceived += eventHandler;

// Queue the constructed packet to be sent on the wire
this.SendQueue.Add(subscribePacket);
this.SendQueue.Enqueue(subscribePacket);

SubAckPacket subAck;
SubscribeResult subscribeResult;
try
{
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -438,14 +449,14 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;

this.SendQueue.Add(unsubscribePacket);
this.SendQueue.Enqueue(unsubscribePacket);

// FIXME: Cancellation token and better timeout value
UnsubAckPacket unsubAck;
UnsubscribeResult unsubscribeResult;
try
{
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

// FIXME: Validate that the packet identifier matches
}
Expand Down Expand Up @@ -497,15 +508,19 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)

if (clean)
{
if (this.SendQueue.Count > 0)
if (!this.SendQueue.IsEmpty)
{
Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting.");
}

// We only clear the send queue on explicit disconnect
while (this.SendQueue.TryTake(out _))
if (!this.OutgoingPublishQueue.IsEmpty)
{
Logger.Warn($"HandleDisconnection: Outgoing publish queue not empty. {this.OutgoingPublishQueue.Count} packets pending but we are disconnecting.");
}

// We only clear the queues on explicit disconnect
this.SendQueue.Clear();
this.OutgoingPublishQueue.Clear();
}

// Fire the corresponding after event
Expand Down
Loading
Loading