Skip to content

Commit

Permalink
Fix MultiJson when direct streaming is disabled (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
dawust authored Dec 19, 2024
1 parent 6dc8ff9 commit ec5f19d
Showing 1 changed file with 15 additions and 12 deletions.
27 changes: 15 additions & 12 deletions src/Elastic.Transport/Requests/Body/PostData.MultiJson.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public override void Write(Stream writableStream, ITransportConfiguration settin
$"{nameof(PostDataMultiJson<T>)} only does not support {nameof(PostType)}.{Type.GetStringValue()}");

MemoryStream? buffer = null;
var stream = writableStream;

switch (Type)
{
Expand All @@ -60,12 +61,12 @@ public override void Write(Stream writableStream, ITransportConfiguration settin
if (!enumerator.MoveNext())
return;

BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref writableStream);
BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref stream);
do
{
var bytes = enumerator.Current.Utf8Bytes();
writableStream.Write(bytes, 0, bytes.Length);
writableStream.Write(NewLineByteArray, 0, 1);
stream.Write(bytes, 0, bytes.Length);
stream.Write(NewLineByteArray, 0, 1);
} while (enumerator.MoveNext());

break;
Expand All @@ -78,12 +79,12 @@ public override void Write(Stream writableStream, ITransportConfiguration settin
if (!enumerator.MoveNext())
return;

BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref writableStream);
BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref stream);
do
{
var o = enumerator.Current;
settings.RequestResponseSerializer.Serialize(o, writableStream, SerializationFormatting.None);
writableStream.Write(NewLineByteArray, 0, 1);
settings.RequestResponseSerializer.Serialize(o, stream, SerializationFormatting.None);
stream.Write(NewLineByteArray, 0, 1);
} while (enumerator.MoveNext());

break;
Expand All @@ -102,6 +103,8 @@ public override async Task WriteAsync(Stream writableStream, ITransportConfigura
$"{nameof(PostDataMultiJson<T>)} only does not support {nameof(PostType)}.{Type.GetStringValue()}");

MemoryStream? buffer = null;
var stream = writableStream;

switch (Type)
{
case PostType.EnumerableOfString:
Expand All @@ -113,12 +116,12 @@ public override async Task WriteAsync(Stream writableStream, ITransportConfigura
if (!enumerator.MoveNext())
return;

BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref writableStream);
BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref stream);
do
{
var bytes = enumerator.Current.Utf8Bytes();
await writableStream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);
await writableStream.WriteAsync(NewLineByteArray, 0, 1, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(bytes, 0, bytes.Length, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(NewLineByteArray, 0, 1, cancellationToken).ConfigureAwait(false);
} while (enumerator.MoveNext());

break;
Expand All @@ -132,14 +135,14 @@ public override async Task WriteAsync(Stream writableStream, ITransportConfigura
if (!enumerator.MoveNext())
return;

BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref writableStream);
BufferIfNeeded(settings.MemoryStreamFactory, disableDirectStreaming, ref buffer, ref stream);
do
{
var o = enumerator.Current;
await settings.RequestResponseSerializer.SerializeAsync(o, writableStream,
await settings.RequestResponseSerializer.SerializeAsync(o, stream,
SerializationFormatting.None, cancellationToken)
.ConfigureAwait(false);
await writableStream.WriteAsync(NewLineByteArray, 0, 1, cancellationToken).ConfigureAwait(false);
await stream.WriteAsync(NewLineByteArray, 0, 1, cancellationToken).ConfigureAwait(false);
} while (enumerator.MoveNext());

break;
Expand Down

0 comments on commit ec5f19d

Please sign in to comment.