Skip to content

Commit

Permalink
Null check bug fixes and tweak accessibility of properties on `Transp…
Browse files Browse the repository at this point in the history
…ortResponse` (#144)

`DefaultResponseFactory` was throwing if the response stream was null.
This can occur when an exception is thrown when sending the request
(e.g., `HttpRequestException`), for example, when the `HttpClient`
cannot connect to the endpoint. Rather than throwing a null exception
here, we still want to return a response with the original exception
attached.

In `StreamResponse`, we must safety-check that any linked disposables
are not null before attempting to dispose of them.

The final change in `TransportResponse` is a tweak for the ingest work.
The `BulkStreamingResponse` was initially derived from the
`StreamResponse` to share behaviour. However, this causes the `Body`
property (stream) to be present on the derived type. As we are handling
stream reading internally, this is unnecessary and could produce weird
behaviour if the consumer tries to access the stream directly. Instead,
`BulkStreamingResponse` derives directly from `TransportResponse`,
overriding `LeaveOpen` and handling `LinkedDisposables` in its own
`Dispose` method.

This means we could potentially seal `StreamResponse` again. However, it
might still be helpful for consumers to derive responses from this for
advanced scenarios, with the base class doing the right thing during
disposal. I am open to thoughts on whether that's likely to happen.
@flobernd, were you deriving from this in the client?
  • Loading branch information
stevejgordon authored Nov 21, 2024
1 parent efe6b75 commit 1bd2db0
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ internal static void SetCommonAttributes(Activity? activity, ITransportConfigura
}

var productSchemaVersion = string.Empty;
foreach (var attribute in activity.TagObjects)
{
if (attribute.Key.Equals(OpenTelemetryAttributes.DbElasticsearchSchemaUrl, StringComparison.Ordinal))
{
if (attribute.Value is string schemaVersion)
productSchemaVersion = schemaVersion;
}
}

// We add the client schema version only when it differs from the product schema version
if (!productSchemaVersion.Equals(OpenTelemetrySchemaVersion, StringComparison.Ordinal))
Expand Down
7 changes: 4 additions & 3 deletions src/Elastic.Transport/DistributedTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(

if (activity is { IsAllDataRequested: true })
{
if (activity.IsAllDataRequested)
OpenTelemetry.SetCommonAttributes(activity, Configuration);

if (Configuration.Authentication is BasicAuthentication basicAuthentication)
activity.SetTag(SemanticConventions.DbUser, basicAuthentication.Username);

Expand Down Expand Up @@ -261,9 +258,13 @@ private async ValueTask<TResponse> RequestCoreAsync<TResponse>(
activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode);
activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes);

// We don't check IsAllDataRequested here as that's left to the consumer.
if (configureActivity is not null && activity is not null)
configureActivity.Invoke(activity);

if (activity is { IsAllDataRequested: true })
OpenTelemetry.SetCommonAttributes(activity, Configuration);

return FinalizeResponse(endpoint, boundConfiguration, data, pipeline, startedOn, auditor, seenExceptions, response);
}
finally
Expand Down
4 changes: 1 addition & 3 deletions src/Elastic.Transport/Responses/DefaultResponseFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,11 @@ private async ValueTask<TResponse> CreateCoreAsync<TResponse>(
IReadOnlyDictionary<TcpState, int>? tcpStats,
CancellationToken cancellationToken = default) where TResponse : TransportResponse, new()
{
responseStream.ThrowIfNull(nameof(responseStream));

var details = InitializeApiCallDetails(endpoint, boundConfiguration, postData, ex, statusCode, headers, contentType, threadPoolStats, tcpStats, contentLength);

TResponse? response = null;

if (MayHaveBody(statusCode, endpoint.Method, contentLength)
if (responseStream is not null && MayHaveBody(statusCode, endpoint.Method, contentLength)
&& TryResolveBuilder<TResponse>(boundConfiguration.ResponseBuilders, boundConfiguration.ProductResponseBuilders, out var builder))
{
var ownsStream = false;
Expand Down
56 changes: 11 additions & 45 deletions src/Elastic.Transport/Responses/Special/StreamResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,31 @@
namespace Elastic.Transport;

/// <summary>
/// A response that exposes the response <see cref="TransportResponse{T}.Body"/> as <see cref="Stream"/>.
/// A response that exposes the response as a <see cref="Stream"/>.
/// <para>
/// <strong>MUST</strong> be disposed after use to ensure the HTTP connection is freed for reuse.
/// </para>
/// </summary>
public class StreamResponse : TransportResponse<Stream>, IDisposable
public sealed class StreamResponse : StreamResponseBase, IDisposable
{
private bool _disposed;

/// <summary>
/// The MIME type of the response, if present.
/// </summary>
public string ContentType { get; }

/// <inheritdoc cref="StreamResponse"/>
public StreamResponse()
{
Body = Stream.Null;
public StreamResponse() : base(Stream.Null) =>
ContentType = string.Empty;
}

/// <inheritdoc cref="StreamResponse"/>
public StreamResponse(Stream body, string? contentType)
{
Body = body;
public StreamResponse(Stream body, string? contentType) : base(body) =>
ContentType = contentType ?? string.Empty;
}

internal override bool LeaveOpen => true;

/// <summary>
/// Disposes the underlying stream.
/// The MIME type of the response, if present.
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
Body.Dispose();

if (LinkedDisposables is not null)
{
foreach (var disposable in LinkedDisposables)
disposable.Dispose();
}
}

_disposed = true;
}
}
public string ContentType { get; }

/// <summary>
/// Disposes the underlying stream.
/// The raw response stream.
/// </summary>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
public Stream Body => Stream;

/// <inheritdoc/>
protected internal override bool LeaveOpen => true;
}
70 changes: 70 additions & 0 deletions src/Elastic.Transport/Responses/Special/StreamResponseBase.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using System;
using System.IO;
using Elastic.Transport.Extensions;

namespace Elastic.Transport;

/// <summary>
/// A base class for implementing responses that access the raw response stream.
/// </summary>
public abstract class StreamResponseBase : TransportResponse, IDisposable
{
/// <inheritdoc/>
protected internal override bool LeaveOpen => true;

/// <summary>
/// The raw response stream from the HTTP layer.
/// </summary>
/// <remarks>
/// <b>MUST</b> be disposed to release the underlying HTTP connection for reuse.
/// </remarks>
protected Stream Stream { get; }

/// <summary>
/// Indicates that the response has been disposed and it is not longer safe to access the stream.
/// </summary>
protected bool Disposed { get; private set; }

/// <inheritdoc cref="StreamResponseBase"/>
public StreamResponseBase(Stream responseStream)
{
responseStream.ThrowIfNull(nameof(responseStream));
Stream = responseStream;
}

/// <summary>
/// Disposes the underlying stream.
/// </summary>
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!Disposed)
{
if (disposing)
{
Stream?.Dispose();

if (LinkedDisposables is not null)
{
foreach (var disposable in LinkedDisposables)
disposable?.Dispose();
}
}

Disposed = true;
}
}

/// <summary>
/// Disposes the underlying stream.
/// </summary>
public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}
}
8 changes: 4 additions & 4 deletions src/Elastic.Transport/Responses/TransportResponse.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ namespace Elastic.Transport;

/// <summary>
/// A response from an Elastic product including details about the request/response life cycle. Base class for the built in low level response
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/>, <see cref="StreamResponse"/> and <see cref="VoidResponse"/>
/// types, <see cref="StringResponse"/>, <see cref="BytesResponse"/>, <see cref="DynamicResponse"/>, and <see cref="VoidResponse"/>
/// </summary>
public abstract class TransportResponse<T> : TransportResponse
{
/// <summary>
/// The deserialized body returned by the product.
/// The (potentially deserialized) response returned by the product.
/// </summary>
public T Body { get; protected internal set; }
}
Expand Down Expand Up @@ -46,7 +46,7 @@ public override string ToString() => ApiCallDetails?.DebugInformation
/// StreamResponse and kept internal. If we later make this public, we might need to refine this.
/// </remarks>
[JsonIgnore]
internal IEnumerable<IDisposable>? LinkedDisposables { get; set; }
protected internal IEnumerable<IDisposable>? LinkedDisposables { get; internal set; }

/// <summary>
/// Allows the response to identify that the response stream should NOT be automatically disposed.
Expand All @@ -55,6 +55,6 @@ public override string ToString() => ApiCallDetails?.DebugInformation
/// Currently only used by StreamResponse and therefore internal.
/// </remarks>
[JsonIgnore]
internal virtual bool LeaveOpen { get; } = false;
protected internal virtual bool LeaveOpen { get; } = false;
}

0 comments on commit 1bd2db0

Please sign in to comment.