Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Benchmark and Duplex Pipe Concurrent Usage Fixes #27

Merged
merged 15 commits into from
Sep 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ await client.ConnectAsync();

await client.Proxy.UpdateInfoAndWait(1, 2, "Custom Status");
```
## Benchmarks
```
BenchmarkDotNet v0.13.8, Windows 11 (10.0.22621.2283/22H2/2022Update/SunValley2)
AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores
.NET SDK 8.0.100-rc.1.23455.8
[Host] : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2
Job-ASQQIE : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2

Platform=X64 Runtime=.NET 7.0 MaxIterationCount=5
MaxWarmupIterationCount=3 MinIterationCount=3 MinWarmupIterationCount=1
```

| Method | Mean | Error | StdDev | Op/s | Gen0 | Gen1 | Allocated |
|------------------------------------- |---------:|---------:|---------:|---------:|-------:|-------:|----------:|
| InvocationNoArgument | 38.66 us | 2.070 us | 0.537 us | 25,867.9 | 0.0610 | - | 681 B |
| InvocationUnmanagedArgument | 38.35 us | 2.496 us | 0.648 us | 26,078.6 | 0.0610 | - | 737 B |
| InvocationUnmanagedMultipleArguments | 38.98 us | 2.044 us | 0.531 us | 25,654.8 | 0.0610 | - | 785 B |
| InvocationNoArgumentWithResult | 38.22 us | 0.752 us | 0.195 us | 26,166.8 | 0.0610 | - | 721 B |
| InvocationWithDuplexPipe_Upload | 64.48 us | 2.690 us | 0.416 us | 15,509.1 | 2.0752 | 0.4883 | 14142 B |

## Method Invocation Table
Some methods are handled differently based upon the arguments passed and there are limitations placed upon the types of arguments which can be used together. Most of these incompatibilities handled with Diagnostic Errors provided by the `NexNet.Generator`. Below is a table which shows valid combinations of arguments and return values.
Expand Down
7 changes: 6 additions & 1 deletion src/NexNet.IntegrationTests/BaseTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using NexNet.Quic;
using NexNet.Transports;
using NUnit.Framework;
using NUnit.Framework.Interfaces;
using NUnit.Framework.Internal;

namespace NexNet.IntegrationTests;
Expand Down Expand Up @@ -38,7 +39,6 @@ public enum Type
[OneTimeSetUp]
public void OneTimeSetUp()
{

//_logger = new ConsoleLogger();
Trace.Listeners.Add(new ConsoleTraceListener());
_socketDirectory = Directory.CreateTempSubdirectory("socketTests");
Expand All @@ -62,6 +62,11 @@ public virtual void SetUp()
[TearDown]
public virtual void TearDown()
{
if (TestContext.CurrentContext.Result.Outcome != ResultState.Success)
{
_logger.Flush(TestContext.Out);
}

CurrentPath = null;
CurrentTcpPort = null;
CurrentUdpPort = null;
Expand Down
59 changes: 49 additions & 10 deletions src/NexNet.IntegrationTests/ConsoleLogger.cs
Original file line number Diff line number Diff line change
@@ -1,25 +1,35 @@
using System.Diagnostics;
using NUnit.Framework;
using System;
using System.Diagnostics;
using System.IO;

namespace NexNet.IntegrationTests;

public class ConsoleLogger : INexusLogger
{
private readonly string _prefix = "";
private readonly Stopwatch _sw;
private readonly TextWriter _outWriter;
private readonly string[]? _lines;
private int _currentLineIndex = 0;
private int _totalLinesWritten = 0;

public string? Category { get; }
public string? Category { get; set; }

public bool LogEnabled { get; set; } = true;

private readonly ConsoleLogger _baseLogger;

public ConsoleLogger()
public INexusLogger.LogLevel MinLogLevel { get; set; } = INexusLogger.LogLevel.Trace;

public int TotalLinesWritten
{
get => _totalLinesWritten;
}

public ConsoleLogger(int maxLines = 200)
{
_lines = new string[maxLines];
_baseLogger = this;
_sw = Stopwatch.StartNew();
_outWriter = TestContext.Out;
}

private ConsoleLogger(ConsoleLogger baseLogger, string? category, string prefix = "")
Expand All @@ -28,7 +38,6 @@ private ConsoleLogger(ConsoleLogger baseLogger, string? category, string prefix
_prefix = prefix;
Category = category;
_sw = baseLogger._sw;
_outWriter = baseLogger._outWriter;
}


Expand All @@ -37,16 +46,46 @@ public void Log(INexusLogger.LogLevel logLevel, string? category, Exception? exc
if (!_baseLogger.LogEnabled)
return;

_outWriter.WriteLine($"[{_sw.ElapsedTicks/(double)Stopwatch.Frequency:0.000000}]{_prefix} [{category}]: {message} {exception}");
if (logLevel < MinLogLevel)
return;

lock (_baseLogger._lines!)
{
_baseLogger._lines[_baseLogger._currentLineIndex] =
$"[{_sw.ElapsedTicks / (double)Stopwatch.Frequency:0.000000}]{_prefix} [{category}]: {message} {exception}";
_baseLogger._currentLineIndex = (_baseLogger._currentLineIndex + 1) % _baseLogger._lines.Length;
_baseLogger._totalLinesWritten++;
}
}

public INexusLogger CreateLogger(string? category)
{
return new ConsoleLogger(_baseLogger, category, _prefix);
return new ConsoleLogger(_baseLogger, category, _prefix) { MinLogLevel = MinLogLevel };
}

public INexusLogger CreateLogger(string? category, string prefix)
{
return new ConsoleLogger(_baseLogger, category, prefix);
return new ConsoleLogger(_baseLogger, category, prefix) { MinLogLevel = MinLogLevel };
}

public void Flush(TextWriter writer)
{
if (_baseLogger._totalLinesWritten == 0)
return;

var startIndex = _baseLogger._currentLineIndex;
var maxLines = _baseLogger._lines!.Length;

if (_baseLogger._totalLinesWritten > maxLines)
{
writer.WriteLine($"Truncating Log. Showing only last {maxLines} out of {_baseLogger._totalLinesWritten} total lines written.");
}
for (int i = 0; i < maxLines; i++)
{
writer.WriteLine(_baseLogger._lines![(startIndex + i) % maxLines]);
}

_baseLogger._currentLineIndex = 0;
_baseLogger._totalLinesWritten = 0;
}
}
20 changes: 10 additions & 10 deletions src/NexNet.IntegrationTests/NexusServerTests_NexusInvocations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public async Task NexusInvokesOnAll(Type type)
connectedNexus.OnConnectedEvent = async nexus =>
{
// Second connection
if (++connectedCount == 2)
if (Interlocked.Increment(ref connectedCount) == 2)
{
await nexus.Context.Clients.All.ClientTask();
}
Expand Down Expand Up @@ -199,7 +199,7 @@ public async Task NexusInvokesOnGroup(Type type)
{
nexus.Context.Groups.Add("group");
// Second connection
if (++connectedCount == 2)
if (Interlocked.Increment(ref connectedCount) == 2)
{
await nexus.Context.Clients.Group("group").ClientTask();
}
Expand Down Expand Up @@ -234,7 +234,7 @@ public async Task NexusInvokesOnGroups(Type type)
{
connectedNexus.OnConnectedEvent = async nexus =>
{
if (++connectedCount == 1) {
if (Interlocked.Increment(ref connectedCount) == 1) {
nexus.Context.Groups.Add("group");
}
// Second connection
Expand Down Expand Up @@ -276,7 +276,7 @@ public async Task NexusInvokesOnOthers(Type type)
connectedNexus.OnConnectedEvent = async nexus =>
{
// Second connection
if (++connectedCount == 2)
if (Interlocked.Increment(ref connectedCount) == 2)
{
await nexus.Context.Clients.Others.ClientTask();
await Task.Delay(10);
Expand All @@ -288,8 +288,8 @@ public async Task NexusInvokesOnOthers(Type type)
var (client1, clientNexus1) = CreateClient(CreateClientConfig(type));
var (client2, clientNexus2) = CreateClient(CreateClientConfig(type));
#pragma warning disable CS1998
clientNexus1.ClientTaskEvent = async _ => invocationCount++;
clientNexus2.ClientTaskEvent = async _ => invocationCount++;
clientNexus1.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
clientNexus2.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
#pragma warning restore CS1998
await server.StartAsync().Timeout(1);

Expand All @@ -316,7 +316,7 @@ public async Task NexusInvokesOnClient(Type type)
connectedNexus.OnConnectedEvent = async nexus =>
{
// Second connection
if (++connectedCount == 2)
if (Interlocked.Increment(ref connectedCount) == 2)
{
await nexus.Context.Clients.Client(nexus.Context.Id).ClientTask();
await Task.Delay(10);
Expand All @@ -328,8 +328,8 @@ public async Task NexusInvokesOnClient(Type type)
var (client1, clientNexus1) = CreateClient(CreateClientConfig(type));
var (client2, clientNexus2) = CreateClient(CreateClientConfig(type));
#pragma warning disable CS1998
clientNexus1.ClientTaskEvent = async _ => invocationCount++;
clientNexus2.ClientTaskEvent = async _ => invocationCount++;
clientNexus1.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
clientNexus2.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
#pragma warning restore CS1998
await server.StartAsync().Timeout(1);

Expand All @@ -356,7 +356,7 @@ public async Task NexusInvokesOnClients(Type type)
connectedNexus.OnConnectedEvent = async nexus =>
{
// Second connection
if (++connectedCount == 2)
if (Interlocked.Increment(ref connectedCount) == 2)
{
// ReSharper disable once AccessToModifiedClosure
var clientIds = server!.GetContext().Clients.GetIds().ToArray();
Expand Down
32 changes: 8 additions & 24 deletions src/NexNet.IntegrationTests/Pipes/NexusChannelReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@

namespace NexNet.IntegrationTests.Pipes;

internal class NexusChannelReaderTests
internal class NexusChannelReaderTests : NexusChannelTestBase
{
[Test]
public async Task ReadsData()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);

var baseObject = ComplexMessage.Random();
Expand Down Expand Up @@ -48,7 +48,7 @@ public async Task ReadsData()
[Test]
public async Task ReadsPartialData()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);

var baseObject = ComplexMessage.Random();
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task ReadsPartialData()
[Test]
public async Task CancelsReadDelayed()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
var cts = new CancellationTokenSource(100);
var result = await reader.ReadAsync(cts.Token).Timeout(1);
Expand All @@ -100,7 +100,7 @@ public async Task CancelsReadDelayed()
[Test]
public async Task CancelsReadImmediate()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
var cts = new CancellationTokenSource(100);
cts.Cancel();
Expand All @@ -114,7 +114,7 @@ public async Task CancelsReadImmediate()
[Test]
public async Task Completes()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
// ReSharper disable once MethodHasAsyncOverload
await pipeReader.CompleteAsync();
Expand All @@ -130,7 +130,7 @@ public async Task Completes()
public async Task WaitsForFullData()
{
var tcs = new TaskCompletionSource();
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
var baseObject = ComplexMessage.Random();
var bytes = new ReadOnlySequence<byte>(MemoryPackSerializer.Serialize(baseObject));
Expand All @@ -153,7 +153,7 @@ public async Task WaitsForFullData()
public async Task ReadsMultiple()
{
const int iterations = 1000;
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
var baseObject = ComplexMessage.Random();
var bytes = new ReadOnlySequence<byte>(MemoryPackSerializer.Serialize(baseObject));
Expand All @@ -171,20 +171,4 @@ public async Task ReadsMultiple()

Assert.AreEqual(iterations, result.Count());
}
private class DummyPipeStateManager : IPipeStateManager
{
public ushort Id { get; } = 0;
public ValueTask NotifyState()
{
return default;
}

public bool UpdateState(NexusDuplexPipe.State updatedState, bool remove = false)
{
CurrentState |= updatedState;
return true;
}

public NexusDuplexPipe.State CurrentState { get; private set; } = NexusDuplexPipe.State.Ready;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class NexusChannelReaderUnmanagedTests
public async Task ReadsData<T>(T inputData)
where T : unmanaged
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);

var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
await pipeReader.BufferData(Utilities.GetBytes(inputData));
Expand All @@ -32,7 +32,7 @@ public async Task ReadsData<T>(T inputData)
public async Task CancelsReadDelayed<T>()
where T : unmanaged
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);
var cts = new CancellationTokenSource(100);
var result = await reader.ReadAsync(cts.Token).Timeout(1);
Expand All @@ -45,7 +45,7 @@ public async Task CancelsReadDelayed<T>()
[Test]
public async Task CancelsReadImmediate()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);
var cts = new CancellationTokenSource(100);
cts.Cancel();
Expand All @@ -59,7 +59,7 @@ public async Task CancelsReadImmediate()
[Test]
public async Task Completes()
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);

// ReSharper disable once MethodHasAsyncOverload
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task WaitsForFullData<T>(T inputData)
where T : unmanaged
{
var tcs = new TaskCompletionSource();
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);

_ = Task.Run(async () =>
Expand Down Expand Up @@ -121,7 +121,7 @@ public async Task ReadsMultiple<T>(T inputData)
where T : unmanaged
{
const int iterations = 1000;
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
var data = Utilities.GetBytes(inputData);
var count = 0;
Expand Down Expand Up @@ -155,7 +155,7 @@ public async Task ReadsMultipleParallel<T>(T inputData)
where T : unmanaged
{
const int iterations = 1000;
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
var data = Utilities.GetBytes(inputData);
var count = 0;
Expand Down Expand Up @@ -202,7 +202,7 @@ await Task.Run(async () =>
public async Task ReadsWithPartialWrites<T>(T inputData)
where T : unmanaged
{
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
var data = Utilities.GetBytes(inputData);

Expand Down
Loading