Skip to content

Commit

Permalink
Add Periodic Internal Health Checks (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
pglombardo authored Jun 4, 2024
1 parent c1b08ce commit 580b99a
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions Source/HiveMQtt/Client/HiveMQClientTrafficProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,32 @@ public partial class HiveMQClient : IDisposable, IHiveMQClient

private readonly Stopwatch lastCommunicationTimer = new();

/// <summary>
/// Health check method to assure that tasks haven't faulted unexpectedly.
/// </summary>
private async Task RunTaskHealthCheckAsync(Task? task, string taskName)
{
if (task is null)
{
Logger.Info($"{this.Options.ClientId}-(CM)- {taskName} is not running.");
}
else
{
if (task.IsFaulted)
{
Logger.Error($"{this.Options.ClientId}-(CM)- {taskName} Faulted: {task.Exception}");
Logger.Error($"{this.Options.ClientId}-(CM)- {taskName} died. Disconnecting.");
_ = await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
}
}
}

/// <summary>
/// Asynchronous background task that monitors the connection state and sends PingReq packets when
/// necessary.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A boolean return indicating exit state.</returns>
private Task<bool> ConnectionMonitorAsync(CancellationToken cancellationToken) => Task.Run(
private Task ConnectionMonitorAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
var keepAlivePeriod = this.Options.KeepAlive / 2;
Expand All @@ -77,34 +96,35 @@ private Task<bool> ConnectionMonitorAsync(CancellationToken cancellationToken) =

// Dumping Client State
Logger.Trace($"{this.Options.ClientId}-(CM)- {this.ConnectState} lastCommunicationTimer:{this.lastCommunicationTimer.Elapsed}");
Logger.Trace($"{this.Options.ClientId}-(CM)- SendQueue:{this.SendQueue.Count} ReceivedQueue:{this.ReceivedQueue.Count} OutgoingPublishQueue:{this.OutgoingPublishQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:{this.TransactionQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionMonitor:{this.ConnectionMonitorTask?.Status}");
Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionPublishWriter:{this.ConnectionPublishWriterTask?.Status}");
Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionWriter:{this.ConnectionWriterTask?.Status}");
Logger.Trace($"{this.Options.ClientId}-(CM)- - ConnectionReader:{this.ConnectionReaderTask?.Status}");
Logger.Trace($"{this.Options.ClientId}-(CM)- - ReceivedPacketsHandler:{this.ReceivedPacketsHandlerTask?.Status}");
Logger.Trace($"{this.Options.ClientId}-(CM)- SendQueue:............{this.SendQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- ReceivedQueue:........{this.ReceivedQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- OutgoingPublishQueue:.{this.OutgoingPublishQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- TransactionQueue:.....{this.TransactionQueue.Count}");
Logger.Trace($"{this.Options.ClientId}-(CM)- # of Subscriptions:...{this.Subscriptions.Count}");

await this.RunTaskHealthCheckAsync(this.ConnectionWriterTask, "ConnectionWriter").ConfigureAwait(false);
await this.RunTaskHealthCheckAsync(this.ConnectionReaderTask, "ConnectionReader").ConfigureAwait(false);
await this.RunTaskHealthCheckAsync(this.ConnectionPublishWriterTask, "ConnectionPublishWriter").ConfigureAwait(false);
await this.RunTaskHealthCheckAsync(this.ReceivedPacketsHandlerTask, "ReceivedPacketsHandler").ConfigureAwait(false);

try
{
await Task.Delay(2000, cancellationToken).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
Logger.Trace($"{this.Options.ClientId}-(CM)- Cancelled");
break;
Logger.Info($"{this.Options.ClientId}-(CM)- Cancelled");
return;
}
}

Logger.Trace($"{this.Options.ClientId}-(CM)- Exiting...{this.ConnectState}");

return true;
}, cancellationToken);

/// <summary>
/// Asynchronous background task that handles the outgoing publish packets queued in OutgoingPublishQueue.
/// </summary>
private Task<bool> ConnectionPublishWriterAsync(CancellationToken cancellationToken) => Task.Run(
private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
this.lastCommunicationTimer.Start();
Expand Down Expand Up @@ -168,13 +188,12 @@ private Task<bool> ConnectionPublishWriterAsync(CancellationToken cancellationTo
} // while(true)

Logger.Trace($"{this.Options.ClientId}-(PW)- ConnectionPublishWriter Exiting...{this.ConnectState}");
return true;
}, cancellationToken);

/// <summary>
/// Asynchronous background task that handles the outgoing traffic of packets queued in the sendQueue.
/// </summary>
private Task<bool> ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run(
private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
this.lastCommunicationTimer.Start();
Expand Down Expand Up @@ -280,7 +299,7 @@ private Task<bool> ConnectionWriterAsync(CancellationToken cancellationToken) =>
} // while(true)

Logger.Trace($"{this.Options.ClientId}-(W)- ConnectionWriter Exiting...{this.ConnectState}");
return true;
return;
}, cancellationToken);

/// <summary>
Expand Down Expand Up @@ -383,8 +402,7 @@ private Task<bool> ConnectionReaderAsync(CancellationToken cancellationToken) =>
/// Continually processes the packets queued in the receivedQueue.
/// </summary>
/// <param name="cancellationToken">The cancellation token to stop the task.</param>
/// <returns>A fairly worthless boolean.</returns>
private Task<bool> ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Task.Run(
private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) => Task.Run(
async () =>
{
Logger.Trace($"{this.Options.ClientId}-(RPH)- Starting...{this.ConnectState}");
Expand Down Expand Up @@ -412,7 +430,7 @@ private Task<bool> ReceivedPacketsHandlerAsync(CancellationToken cancellationTok
ReasonString = "Packet Too Large",
};
await this.DisconnectAsync(opts).ConfigureAwait(false);
return false;
return;
}
}

Expand Down Expand Up @@ -462,7 +480,7 @@ private Task<bool> ReceivedPacketsHandlerAsync(CancellationToken cancellationTok
} // while (true)

Logger.Trace($"{this.Options.ClientId}-(RPH)- ReceivedPacketsHandler Exiting...{this.ConnectState}");
return true;
return;
}, cancellationToken);

/// <summary>
Expand Down

0 comments on commit 580b99a

Please sign in to comment.