Skip to content

Commit

Permalink
spanification of WireFormatting & Reader/Writer
Browse files Browse the repository at this point in the history
  • Loading branch information
bollhals committed Jun 2, 2020
1 parent 4c34f6c commit d0d3032
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 163 deletions.
8 changes: 4 additions & 4 deletions projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -952,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
EmitLine(" {");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
EmitLine(" result.ReadArgumentsFrom(ref reader);");
EmitLine(" return result;");
EmitLine(" }");
Expand Down
8 changes: 5 additions & 3 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@ public Command HandleFrame(in InboundFrame f)
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload);
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
ulong totalBodyBytes = m_header.ReadFrom(span.Slice(2));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
Expand Down
11 changes: 5 additions & 6 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ public virtual object Clone()
///<summary>
/// Fill this instance from the given byte buffer stream.
///</summary>
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
internal ulong ReadFrom(ReadOnlySpan<byte> span)
{
// Skipping the first two bytes since they arent used (weight - not currently used)
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(span.Slice(2));
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span.Slice(10));
ReadPropertiesFrom(ref reader);
return bodySize;
}
Expand All @@ -81,13 +81,12 @@ internal ulong ReadFrom(ReadOnlyMemory<byte> memory)

private const ushort ZERO = 0;

internal int WriteTo(Memory<byte> memory, ulong bodySize)
internal int WriteTo(Span<byte> span, ulong bodySize)
{
var span = memory.Span;
NetworkOrderSerializer.WriteUInt16(span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(span.Slice(2), bodySize);

ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(span.Slice(10));
WritePropertiesTo(ref writer);
return 10 + writer.Offset;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,16 @@ internal ref struct ContentHeaderPropertyReader
private const int StartBitMask = 0b1000_0000_0000_0000;
private const int EndBitMask = 0b0000_0000_0000_0001;

private readonly ReadOnlyMemory<byte> _memory;
private readonly ReadOnlySpan<byte> _span;
private int _offset;
private int _bitMask;
private int _bits;

private ReadOnlySpan<byte> Span => _span.Slice(_offset);
private ReadOnlyMemory<byte> Memory => _memory.Slice(_offset);

public ContentHeaderPropertyReader(ReadOnlyMemory<byte> memory)
public ContentHeaderPropertyReader(ReadOnlySpan<byte> span)
{
_memory = memory;
_span = memory.Span;
_span = span;
_offset = 0;
_bitMask = EndBitMask; // force a flag read
_bits = 1; // just the continuation bit
Expand Down Expand Up @@ -141,15 +138,15 @@ public ushort ReadShort()

public string ReadShortstr()
{
string result = WireFormatting.ReadShortstr(Memory, out int bytesRead);
string result = WireFormatting.ReadShortstr(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

/// <returns>A type of <seealso cref="System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns>
public Dictionary<string, object> ReadTable()
{
Dictionary<string, object> result = WireFormatting.ReadTable(Memory, out int bytesRead);
Dictionary<string, object> result = WireFormatting.ReadTable(Span, out int bytesRead);
_offset += bytesRead;
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ internal ref struct ContentHeaderPropertyWriter
private const ushort StartBitMask = 0b1000_0000_0000_0000;
private const ushort EndBitMask = 0b0000_0000_0000_0001;

private readonly Memory<byte> _memory;
private readonly Span<byte> _span;
private int _offset;
private ushort _bitAccumulator;
Expand All @@ -59,12 +58,10 @@ internal ref struct ContentHeaderPropertyWriter
public int Offset => _offset;

private Span<byte> Span => _span.Slice(_offset);
private Memory<byte> Memory => _memory.Slice(_offset);

public ContentHeaderPropertyWriter(Memory<byte> memory)
public ContentHeaderPropertyWriter(Span<byte> span)
{
_memory = memory;
_span = _memory.Span;
_span = span;
_offset = 0;
_bitAccumulator = 0;
_bitMask = StartBitMask;
Expand Down Expand Up @@ -124,12 +121,12 @@ public void WriteShort(ushort val)

public void WriteShortstr(string val)
{
_offset += WireFormatting.WriteShortstr(Memory, val);
_offset += WireFormatting.WriteShortstr(Span, val);
}

public void WriteTable(IDictionary<string, object> val)
{
_offset += WireFormatting.WriteTable(Memory, val);
_offset += WireFormatting.WriteTable(Span, val);
}

public void WriteTimestamp(AmqpTimestamp val)
Expand Down
24 changes: 11 additions & 13 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ internal override int GetMinimumPayloadBufferSize()
return 2 + _header.GetRequiredBufferSize();
}

internal override int WritePayload(Memory<byte> memory)
internal override int WritePayload(Span<byte> span)
{
// write protocol class id (2 bytes)
NetworkOrderSerializer.WriteUInt16(memory.Span, _header.ProtocolClassId);
NetworkOrderSerializer.WriteUInt16(span, _header.ProtocolClassId);
// write header (X bytes)
int bytesWritten = _header.WriteTo(memory.Slice(2), (ulong)_bodyLength);
int bytesWritten = _header.WriteTo(span.Slice(2), (ulong)_bodyLength);
return bytesWritten + 2;
}
}
Expand All @@ -90,9 +90,9 @@ internal override int GetMinimumPayloadBufferSize()
return _body.Length;
}

internal override int WritePayload(Memory<byte> memory)
internal override int WritePayload(Span<byte> span)
{
_body.CopyTo(memory);
_body.Span.CopyTo(span);
return _body.Length;
}
}
Expand All @@ -112,12 +112,11 @@ internal override int GetMinimumPayloadBufferSize()
return 4 + _method.GetRequiredBufferSize();
}

internal override int WritePayload(Memory<byte> memory)
internal override int WritePayload(Span<byte> span)
{
var span = memory.Span;
NetworkOrderSerializer.WriteUInt16(span, _method.ProtocolClassId);
NetworkOrderSerializer.WriteUInt16(span.Slice(2), _method.ProtocolMethodId);
var argWriter = new MethodArgumentWriter(memory.Slice(4));
var argWriter = new MethodArgumentWriter(span.Slice(4));
_method.WriteArgumentsTo(ref argWriter);
return 4 + argWriter.Offset;
}
Expand All @@ -134,7 +133,7 @@ internal override int GetMinimumPayloadBufferSize()
return 0;
}

internal override int WritePayload(Memory<byte> memory)
internal override int WritePayload(Span<byte> span)
{
return 0;
}
Expand All @@ -151,17 +150,16 @@ protected OutboundFrame(FrameType type, int channel)
Channel = channel;
}

internal void WriteTo(Memory<byte> memory)
internal void WriteTo(Span<byte> span)
{
var span = memory.Span;
span[0] = (byte)Type;
NetworkOrderSerializer.WriteUInt16(span.Slice(1), (ushort)Channel);
int bytesWritten = WritePayload(memory.Slice(7));
int bytesWritten = WritePayload(span.Slice(7));
NetworkOrderSerializer.WriteUInt32(span.Slice(3), (uint)bytesWritten);
span[bytesWritten + 7] = Constants.FrameEnd;
}

internal abstract int WritePayload(Memory<byte> memory);
internal abstract int WritePayload(Span<byte> span);
internal abstract int GetMinimumPayloadBufferSize();
internal int GetMinimumBufferSize()
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/MainSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public override void HandleFrame(in InboundFrame frame)

if (!_closeServerInitiated && frame.IsMethod())
{
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.Payload);
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.Payload.Span);
if ((method.ProtocolClassId == _closeClassId)
&& (method.ProtocolMethodId == _closeMethodId))
{
Expand Down
11 changes: 4 additions & 7 deletions projects/RabbitMQ.Client/client/impl/MethodArgumentReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,16 @@ namespace RabbitMQ.Client.Impl
{
internal ref struct MethodArgumentReader
{
private readonly ReadOnlyMemory<byte> _memory;
private readonly ReadOnlySpan<byte> _span;
private int _offset;
private int _bitMask;
private int _bits;

private ReadOnlySpan<byte> Span => _span.Slice(_offset);
private ReadOnlyMemory<byte> Memory => _memory.Slice(_offset);

public MethodArgumentReader(ReadOnlyMemory<byte> memory)
public MethodArgumentReader(ReadOnlySpan<byte> span)
{
_memory = memory;
_span = memory.Span;
_span = span;
_offset = 0;
_bitMask = 0;
_bits = 0;
Expand Down Expand Up @@ -119,14 +116,14 @@ public ushort ReadShort()

public string ReadShortstr()
{
string result = WireFormatting.ReadShortstr(Memory, out int bytesRead);
string result = WireFormatting.ReadShortstr(Span, out int bytesRead);
_offset += bytesRead;
return result;
}

public Dictionary<string, object> ReadTable()
{
Dictionary<string, object> result = WireFormatting.ReadTable(Memory, out int bytesRead);
Dictionary<string, object> result = WireFormatting.ReadTable(Span, out int bytesRead);
_offset += bytesRead;
return result;
}
Expand Down
13 changes: 5 additions & 8 deletions projects/RabbitMQ.Client/client/impl/MethodArgumentWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ namespace RabbitMQ.Client.Impl
{
internal ref struct MethodArgumentWriter
{
private readonly Memory<byte> _memory;
private readonly Span<byte> _span;
private int _offset;
private int _bitAccumulator;
Expand All @@ -55,12 +54,10 @@ internal ref struct MethodArgumentWriter
public int Offset => _offset;

private Span<byte> Span => _span.Slice(_offset);
private Memory<byte> Memory => _memory.Slice(_offset);

public MethodArgumentWriter(Memory<byte> memory)
public MethodArgumentWriter(Span<byte> span)
{
_memory = memory;
_span = memory.Span;
_span = span;
_offset = 0;
_bitAccumulator = 0;
_bitMask = 1;
Expand Down Expand Up @@ -117,17 +114,17 @@ public void WriteShort(ushort val)

public void WriteShortstr(string val)
{
_offset += WireFormatting.WriteShortstr(Memory, val);
_offset += WireFormatting.WriteShortstr(Span, val);
}

public void WriteTable(IDictionary val)
{
_offset += WireFormatting.WriteTable(Memory, val);
_offset += WireFormatting.WriteTable(Span, val);
}

public void WriteTable(IDictionary<string, object> val)
{
_offset += WireFormatting.WriteTable(Memory, val);
_offset += WireFormatting.WriteTable(Span, val);
}

public void WriteTimestamp(AmqpTimestamp val)
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/ProtocolBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void CreateConnectionClose(ushort reasonCode,
}

internal abstract ContentHeaderBase DecodeContentHeaderFrom(ushort classId);
internal abstract MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> reader);
internal abstract MethodBase DecodeMethodFrom(ReadOnlySpan<byte> reader);

public override bool Equals(object obj)
{
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/QuiescingSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public override void HandleFrame(in InboundFrame frame)
{
if (frame.IsMethod())
{
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.Payload);
MethodBase method = Connection.Protocol.DecodeMethodFrom(frame.Payload.Span);
if ((method.ProtocolClassId == ClassConstants.Channel)
&& (method.ProtocolMethodId == ChannelMethodConstants.CloseOk))
{
Expand Down
3 changes: 1 addition & 2 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,7 @@ public async Task WriteFrameImpl()
{
int bufferSize = frame.GetMinimumBufferSize();
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
frame.WriteTo(slice);
frame.WriteTo(new Span<byte>(memoryArray, 0, bufferSize));
_writer.Write(memoryArray, 0, bufferSize);
ArrayPool<byte>.Shared.Return(memoryArray);
}
Expand Down
Loading

0 comments on commit d0d3032

Please sign in to comment.