Skip to content

Commit

Permalink
Event Handling & QoS 2 Improvements (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored May 16, 2024
1 parent cc20e19 commit d043390
Show file tree
Hide file tree
Showing 12 changed files with 803 additions and 235 deletions.
1 change: 1 addition & 0 deletions Documentation/docs/how-to/configure-logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Setting `minlevel` to `Trace` will output all activity in the HiveMQtt package d
```log
2024-03-14 15:40:18.2252|TRACE|HiveMQtt.Client.HiveMQClient|Trace Level Logging Legend:
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(W)- == ConnectionWriter
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(PW)- == ConnectionPublishWriter
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(R)- == ConnectionReader
2024-03-14 15:40:18.2312|TRACE|HiveMQtt.Client.HiveMQClient| -(RPH)- == ReceivedPacketsHandler
2024-03-14 15:40:18.2320|INFO|HiveMQtt.Client.HiveMQClient|Connecting to broker at 127.0.0.1:1883
Expand Down
7 changes: 0 additions & 7 deletions Source/HiveMQtt/Client/Exceptions/HiveMQttClientException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,4 @@ public HiveMQttClientException(string message, Exception inner)
: base(message, inner)
{
}

protected HiveMQttClientException(
System.Runtime.Serialization.SerializationInfo info,
System.Runtime.Serialization.StreamingContext context)
: base(info, context)
{
}
}
53 changes: 34 additions & 19 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public HiveMQClient(HiveMQClientOptions? options = null)

Logger.Trace("Trace Level Logging Legend:");
Logger.Trace(" -(W)- == ConnectionWriter");
Logger.Trace(" -(PW)- == ConnectionPublishWriter");
Logger.Trace(" -(R)- == ConnectionReader");
Logger.Trace(" -(CM)- == ConnectionMonitor");
Logger.Trace(" -(RPH)- == ReceivedPacketsHandler");
Expand Down Expand Up @@ -91,7 +92,7 @@ public async Task<ConnectResult> ConnectAsync()
// Construct the MQTT Connect packet and queue to send
var connPacket = new ConnectPacket(this.Options);
Logger.Trace($"Queuing packet for send: {connPacket}");
this.SendQueue.Add(connPacket);
this.SendQueue.Enqueue(connPacket);

ConnAckPacket connAck;
ConnectResult connectResult;
Expand Down Expand Up @@ -162,11 +163,11 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
this.OnDisconnectSent += eventHandler;

Logger.Trace($"Queuing packet for send: {disconnectPacket}");
this.SendQueue.Add(disconnectPacket);
this.SendQueue.Enqueue(disconnectPacket);

try
{
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
disconnectPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -200,7 +201,7 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
if (message.QoS == QualityOfService.AtMostOnceDelivery)
{
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.SendQueue.Add(publishPacket);
this.OutgoingPublishQueue.Enqueue(publishPacket);
return new PublishResult(publishPacket.Message);
}
else if (message.QoS == QualityOfService.AtLeastOnceDelivery)
Expand All @@ -211,11 +212,10 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS1CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS1Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.SendQueue.Add(publishPacket);
this.OutgoingPublishQueue.Enqueue(publishPacket);

var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
var pubAckPacket = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

publishPacket.OnPublishQoS1Complete -= eventHandler;
return new PublishResult(publishPacket.Message, pubAckPacket);
Expand All @@ -229,20 +229,31 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message)
EventHandler<OnPublishQoS2CompleteEventArgs> eventHandler = TaskHandler;
publishPacket.OnPublishQoS2Complete += eventHandler;

// Construct the MQTT Connect packet and queue to send
this.SendQueue.Add(publishPacket);
Logger.Trace($"Queuing packet for send: {publishPacket}");
this.OutgoingPublishQueue.Enqueue(publishPacket);

List<ControlPacket> packetList;
try
{
// Wait on the QoS 2 handshake
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false);
// FIXME: Timeout value
packetList = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Logger.Error("PublishAsync: QoS 2 timeout. No response received in time.");
publishResult = new PublishResult(publishPacket.Message);
publishResult.QoS2ReasonCode = null;

// Remove the transaction chain
if (this.transactionQueue.Remove(publishPacket.PacketIdentifier, out var publishQoS2Chain))
{
Logger.Debug($"PublishAsync: QoS 2 timeout. Removing transaction chain for packet identifier {publishPacket.PacketIdentifier}.");
}

// Prepare PublishResult
publishResult = new PublishResult(publishPacket.Message)
{
QoS2ReasonCode = null,
};
publishPacket.OnPublishQoS2Complete -= eventHandler;
return publishResult;
}
Expand Down Expand Up @@ -329,13 +340,13 @@ public async Task<SubscribeResult> SubscribeAsync(SubscribeOptions options)
this.OnSubAckReceived += eventHandler;

// Queue the constructed packet to be sent on the wire
this.SendQueue.Add(subscribePacket);
this.SendQueue.Enqueue(subscribePacket);

SubAckPacket subAck;
SubscribeResult subscribeResult;
try
{
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
subAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);
}
catch (TimeoutException)
{
Expand Down Expand Up @@ -438,14 +449,14 @@ public async Task<UnsubscribeResult> UnsubscribeAsync(UnsubscribeOptions unsubOp
EventHandler<OnUnsubAckReceivedEventArgs> eventHandler = TaskHandler;
this.OnUnsubAckReceived += eventHandler;

this.SendQueue.Add(unsubscribePacket);
this.SendQueue.Enqueue(unsubscribePacket);

// FIXME: Cancellation token and better timeout value
UnsubAckPacket unsubAck;
UnsubscribeResult unsubscribeResult;
try
{
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(120)).ConfigureAwait(false);
unsubAck = await taskCompletionSource.Task.WaitAsync(TimeSpan.FromSeconds(60)).ConfigureAwait(false);

// FIXME: Validate that the packet identifier matches
}
Expand Down Expand Up @@ -497,15 +508,19 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)

if (clean)
{
if (this.SendQueue.Count > 0)
if (!this.SendQueue.IsEmpty)
{
Logger.Warn($"HandleDisconnection: Send queue not empty. {this.SendQueue.Count} packets pending but we are disconnecting.");
}

// We only clear the send queue on explicit disconnect
while (this.SendQueue.TryTake(out _))
if (!this.OutgoingPublishQueue.IsEmpty)
{
Logger.Warn($"HandleDisconnection: Outgoing publish queue not empty. {this.OutgoingPublishQueue.Count} packets pending but we are disconnecting.");
}

// We only clear the queues on explicit disconnect
this.SendQueue.Clear();
this.OutgoingPublishQueue.Clear();
}

// Fire the corresponding after event
Expand Down
Loading

0 comments on commit d043390

Please sign in to comment.