Skip to content

Commit

Permalink
Added a synchronous write loop for connections.
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Christiansen committed Sep 21, 2023
1 parent 6677851 commit 8a40935
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 8 deletions.
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,9 @@ projects/Unit*/TestResult.xml
# Vim
.sw?
.*.sw?


#################
## JetBrains Rider
#################
.idea/
9 changes: 8 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,13 @@ public TimeSpan ContinuationTimeout
/// </summary>
public bool TopologyRecoveryEnabled { get; set; } = true;

/// <summary>
/// Force writes to the socket to run on a dedicated thread instead of the thread pool. This may prevent
/// timeouts if a large number of blocking requests are going out simultaneously. Will become obsolete
/// once requests become asynchronous. Defaults to false.
/// </summary>
public bool EnableSynchronousWriteLoop { get; set; } = false;

/// <summary>
/// Filter to include/exclude entities from topology recovery.
/// Default filter includes all entities in topology recovery.
Expand Down Expand Up @@ -640,7 +647,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
{
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout, EnableSynchronousWriteLoop);
return ConfigureFrameHandler(fh);
}

Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/IProtocolExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ public static IFrameHandler CreateFrameHandler(
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout,
TimeSpan readTimeout,
TimeSpan writeTimeout)
TimeSpan writeTimeout,
bool enableSynchronousWriteLoop)
{
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout)
return new SocketFrameHandler(endpoint, socketFactory, connectionTimeout, readTimeout, writeTimeout, enableSynchronousWriteLoop)
{
MemoryPool = pool
};
Expand Down
38 changes: 33 additions & 5 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ public static async Task TimeoutAfter(this Task task, TimeSpan timeout)
class SocketFrameHandler : IFrameHandler
{
private readonly AmqpTcpEndpoint _endpoint;


// Socket poll timeout in ms. If the socket does not
// become writeable in this amount of time, we throw
// an exception.
Expand All @@ -78,19 +80,19 @@ class SocketFrameHandler : IFrameHandler
private readonly byte[] _frameHeaderBuffer;
private bool _closed;
private ArrayPool<byte> _pool = ArrayPool<byte>.Shared;
private readonly bool _enableSynchronousWriteLoop;

public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout, bool enableSynchronousWriteLoop)
{
_endpoint = endpoint;
_enableSynchronousWriteLoop = enableSynchronousWriteLoop;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<ReadOnlyMemory<byte>>(
new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
SingleReader = true,
SingleWriter = false
AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false
});

_channelReader = channel.Reader;
Expand Down Expand Up @@ -134,7 +136,15 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);

WriteTimeout = writeTimeout;
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
if (_enableSynchronousWriteLoop)
{
TaskCreationOptions tco = TaskCreationOptions.LongRunning | TaskCreationOptions.DenyChildAttach;
_writerTask = Task.Factory.StartNew(SynchronousWriteLoop, CancellationToken.None, tco, TaskScheduler.Default);
}
else
{
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
}
}

public AmqpTcpEndpoint Endpoint
Expand Down Expand Up @@ -281,6 +291,24 @@ private async Task WriteLoop()
}
}

private void SynchronousWriteLoop()
{
while (_channelReader.WaitToReadAsync().AsTask().Result)
{
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
while (_channelReader.TryRead(out var memory))
{
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment) &&
segment.Array != null)
{
_writer.Write(segment.Array, segment.Offset, segment.Count);
MemoryPool.Return(segment.Array);
}
}
_writer.Flush();
}
}

private static bool ShouldTryIPv6(AmqpTcpEndpoint endpoint)
{
return Socket.OSSupportsIPv6 && endpoint.AddressFamily != AddressFamily.InterNetwork;
Expand Down
14 changes: 14 additions & 0 deletions projects/Unit/TestConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,20 @@ public void TestCreateConnectionAmqpTcpEndpointListAndClientProvidedName()
}
}

[Test]
public void TestCreateConnectionWithSynchronousWriteLoop()
{
var cf = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
HostName = "localhost",
EnableSynchronousWriteLoop = true
};
using (IConnection conn = cf.CreateConnection()){
Assert.AreEqual(5672, conn.Endpoint.Port);
}
}

[Test]
public void TestCreateConnectionUsesDefaultPort()
{
Expand Down

0 comments on commit 8a40935

Please sign in to comment.