Skip to content

Commit

Permalink
Merge pull request #4 from Fresa/dispose-network-client-as-client-is-…
Browse files Browse the repository at this point in the history
…disposed

Dispose network client as client is disposed
  • Loading branch information
Fresa authored Feb 12, 2022
2 parents a935148 + a880855 commit 4cad71b
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 47 deletions.
24 changes: 20 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,26 @@ jobs:
- name: Determine Release Info
id: release
run: |
from_tag=$(git tag --points-at ${{ steps.gitversion.outputs.versionSourceSha }} | grep -m 1 ^v[0-9]*\.[0-9]*\.[0-9]* | head -1)
[[ -z "$from_tag" ]] && \
from_ref_exclusive=${{ steps.gitversion.outputs.versionSourceSha }} || \
from_ref_exclusive=$from_tag
default_branch=$(git remote show origin | awk '/HEAD branch/ {print $NF}')
if [ "${{ github.ref_name }}" == "$default_branch" ]; then
# Extract the branch name of the branch that merged into the current commit
commit_subject=$(git log -1 --pretty=format:%s)
regex='Merge pull request #[0-9]+ from .+/(.+)$'
[[ $commit_subject =~ $regex ]]
merged_branch=${BASH_REMATCH[1]}
[[ -z "$merged_branch" ]] && \
# Committed directly on default branch, use the previous commit
from_ref_exclusive=$(git log -2 --pretty=format:"%H" | tail -1)
# Find what commit the merged branch branched from originally
from_ref_exclusive=$(diff -u <(git rev-list --first-parent $merged_branch) \
<(git rev-list --first-parent $default_branch) | \
sed -ne 's/^ //p' | head -1)
else
# Get the commit this branch branched from
from_ref_exclusive=$(git rev-list $(git rev-list $default_branch.. | tail -1)^ -n 1)
fi
[[ -z "${{ steps.gitversion.outputs.preReleaseTag }}" ]] && \
is_prerelease=false || \
Expand Down
79 changes: 39 additions & 40 deletions src/Kafka.TestFramework/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,52 +42,49 @@ protected void StartReceiving()
{
var cancellationToken = _signalNoMoreDataToWrite.Token;
var writer = _pipe.Writer;
await using (_networkClient.ConfigureAwait(false))
try
{
try
FlushResult result;
do
{
FlushResult result;
do
{
var memory = writer.GetMemory(MinimumBufferSize);
var bytesRead = await _networkClient.ReceiveAsync(
memory,
cancellationToken)
.ConfigureAwait(false);
var memory = writer.GetMemory(MinimumBufferSize);
var bytesRead = await _networkClient.ReceiveAsync(
memory,
cancellationToken)
.ConfigureAwait(false);

if (bytesRead == 0)
{
break;
}
if (bytesRead == 0)
{
break;
}

Logger.Debug("Received {bytesRead} bytes", bytesRead);
writer.Advance(bytesRead);
Logger.Debug("Received {bytesRead} bytes", bytesRead);
writer.Advance(bytesRead);

result = await writer
.FlushAsync(cancellationToken)
.ConfigureAwait(false);
} while (result.IsCanceled == false &&
result.IsCompleted == false);
}
catch when (_signalNoMoreDataToWrite.IsCancellationRequested)
{
// Shutdown in progress
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
writer.Complete(ex);
throw;
}
finally
{
_signalNoMoreDataToWrite.Cancel();
}

writer.Complete();
result = await writer
.FlushAsync(cancellationToken)
.ConfigureAwait(false);
} while (result.IsCanceled == false &&
result.IsCompleted == false);
}
catch when (_signalNoMoreDataToWrite.IsCancellationRequested)
{
// Shutdown in progress
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
writer.Complete(ex);
throw;
}
finally
{
_signalNoMoreDataToWrite.Cancel();
}

writer.Complete();
});
}

Expand All @@ -98,6 +95,8 @@ public async ValueTask DisposeAsync()

await _sendAndReceiveBackgroundTask
.ConfigureAwait(false);
await _networkClient.DisposeAsync()
.ConfigureAwait(false);
}
}
}
8 changes: 5 additions & 3 deletions src/Kafka.TestFramework/KafkaTestFramework.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,13 @@ public StopOnDispose(KafkaTestFramework testFramework)
{
_testFramework = testFramework;
}
public async ValueTask DisposeAsync()

public ValueTask DisposeAsync()
{
_testFramework._cancellationTokenSource.Cancel();
await Task.WhenAll(_testFramework._backgroundTasks)
.ConfigureAwait(false);
return Task.WhenAll(_testFramework._backgroundTasks)
.ThrowAllExceptions()
.AsValueTask();
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions src/Kafka.TestFramework/TaskExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;

namespace Kafka.TestFramework
{
internal static class TaskExtensions
{
internal static async Task ThrowAllExceptions(this Task task)
{
try
{
await task.ConfigureAwait(false);
}
catch
{
if (task.Exception?.InnerExceptions.Count > 1)
{
ExceptionDispatchInfo.Capture(task.Exception).Throw();
}

throw;
}
}

internal static ValueTask AsValueTask(this Task task) =>
task.IsCompletedSuccessfully ? default : new ValueTask(task);
}
}

0 comments on commit 4cad71b

Please sign in to comment.