Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New BoundedDictionary #173

Merged
merged 8 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions Source/HiveMQtt/Client/HiveMQClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,12 @@ public HiveMQClient(HiveMQClientOptions? options = null)

this.Options = options;
this.cancellationTokenSource = new CancellationTokenSource();
this.ClientReceiveSemaphore = new SemaphoreSlim(this.Options.ClientReceiveMaximum);

// In-flight transaction queues
this.IPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(this.Options.ClientReceiveMaximum);

// Set protocol default until ConnAck is received
this.BrokerReceiveSemaphore = new SemaphoreSlim(65535);
this.OPubTransactionQueue = new BoundedDictionaryX<int, List<ControlPacket>>(65535);
}

/// <inheritdoc />
Expand Down Expand Up @@ -140,9 +142,15 @@ public async Task<ConnectResult> ConnectAsync()
/// <inheritdoc />
public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
{
if (this.ConnectState == ConnectState.Disconnecting)
{
// We're already disconnecting in another task.
return true;
}

if (this.ConnectState != ConnectState.Connected)
{
Logger.Warn("DisconnectAsync called but this client is not connected. State is ${this.ConnectState}.");
Logger.Warn($"DisconnectAsync called but this client is not connected. State is {this.ConnectState}.");
return false;
}

Expand Down Expand Up @@ -510,10 +518,7 @@ private async Task<bool> HandleDisconnectionAsync(bool clean = true)
// Cancel all background tasks and close the socket
this.ConnectState = ConnectState.Disconnected;

// Don't use CancelAsync here to maintain backwards compatibility
// with >=.net6.0. CancelAsync was introduced in .net8.0
this.cancellationTokenSource.Cancel();
this.CloseSocket();
await this.CloseSocketAsync().ConfigureAwait(false);

if (clean)
{
Expand Down
78 changes: 65 additions & 13 deletions Source/HiveMQtt/Client/HiveMQClientSocket.cs
Original file line number Diff line number Diff line change
Expand Up @@ -254,25 +254,17 @@ private async Task<bool> CreateTLSConnectionAsync(Stream stream)
}
}

internal bool CloseSocket(bool? shutdownPipeline = true)
internal async Task<bool> CloseSocketAsync(bool? shutdownPipeline = true)
{
// Cancel the background traffic processing tasks
this.cancellationTokenSource.Cancel();

// Reset the tasks
this.ConnectionPublishWriterTask = null;
this.ConnectionWriterTask = null;
this.ConnectionReaderTask = null;
this.ReceivedPacketsHandlerTask = null;
this.ConnectionMonitorTask = null;
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);

if (shutdownPipeline == true)
{
if (this.Reader != null && this.Writer != null)
{
// Dispose of the PipeReader and PipeWriter
this.Reader.Complete();
this.Writer.Complete();
await this.Reader.CompleteAsync().ConfigureAwait(false);
await this.Writer.CompleteAsync().ConfigureAwait(false);

// Shutdown the pipeline
this.Reader = null;
Expand All @@ -284,7 +276,7 @@ internal bool CloseSocket(bool? shutdownPipeline = true)
{
// Dispose of the Stream
this.Stream.Close();
this.Stream.Dispose();
await this.Stream.DisposeAsync().ConfigureAwait(false);
this.Stream = null;
}

Expand All @@ -300,4 +292,64 @@ internal bool CloseSocket(bool? shutdownPipeline = true)

return true;
}

/// <summary>
/// Cancel all background tasks.
/// </summary>
/// <returns>A task representing the asynchronous operation.</returns>
internal async Task CancelBackgroundTasksAsync()
{
// Don't use CancelAsync here to maintain backwards compatibility
// with >=.net6.0. CancelAsync was introduced in .net8.0
this.cancellationTokenSource.Cancel();

// Delay for a short period to allow the tasks to cancel
await Task.Delay(1000).ConfigureAwait(false);

// Reset the tasks
if (this.ConnectionPublishWriterTask is not null && this.ConnectionPublishWriterTask.IsCompleted)
{
this.ConnectionPublishWriterTask = null;
}
else
{
Logger.Error("ConnectionPublishWriterTask did not complete");
}

if (this.ConnectionWriterTask is not null && this.ConnectionWriterTask.IsCompleted)
{
this.ConnectionWriterTask = null;
}
else
{
Logger.Error("ConnectionWriterTask did not complete");
}

if (this.ConnectionReaderTask is not null && this.ConnectionReaderTask.IsCompleted)
{
this.ConnectionReaderTask = null;
}
else
{
Logger.Error("ConnectionReaderTask did not complete");
}

if (this.ReceivedPacketsHandlerTask is not null && this.ReceivedPacketsHandlerTask.IsCompleted)
{
this.ReceivedPacketsHandlerTask = null;
}
else
{
Logger.Error("ReceivedPacketsHandlerTask did not complete");
}

if (this.ConnectionMonitorTask is not null && this.ConnectionMonitorTask.IsCompleted)
{
this.ConnectionMonitorTask = null;
}
else
{
Logger.Error("ConnectionMonitorTask did not complete");
}
}
}
Loading
Loading