Skip to content

Commit

Permalink
refactor/request data v3 (#130)
Browse files Browse the repository at this point in the history
This PR continues #128 and ensures `RequestPipeline` can be shared over
many requests unless a local configuration is provided.

Similar to `RequestData` being shared in #128.

This includes further refactorings:

- `DateTimeProvider` is only provided externally once (on `NodePool`)
and that instance is used everywhere. Before it could be set seperately
on `NodePool` and `TransportConfiguration`, not by design but by
necessity.

- `RequestPipeline` is no longer disposable (we didn't actually disposed
anything).

- A new type exists `Auditor` this is now explicitly passed to the
methods that need it on `RequestPipeline`. It implements
`IReadOnlyCollection<Audit>` and is exposed as such to users.

- This PR also merges `DefaultRequestPipeline` and `RequestPipeline`
into one.
  • Loading branch information
Mpdreamz authored Nov 8, 2024
1 parent 0813129 commit 3352233
Show file tree
Hide file tree
Showing 42 changed files with 830 additions and 978 deletions.
2 changes: 1 addition & 1 deletion src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu

public IEnumerable<Diagnostics.Auditing.Audit> AsyncAuditTrail { get; set; }
public IEnumerable<Diagnostics.Auditing.Audit> AuditTrail { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; set; }
public Func<Components.VirtualizedCluster> Cluster { get; }

public TransportResponse Response { get; internal set; }
public TransportResponse ResponseAsync { get; internal set; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,16 @@ namespace Elastic.Transport.VirtualizedCluster.Components;
public sealed class ExposingPipelineFactory<TConfiguration> : RequestPipelineFactory
where TConfiguration : class, ITransportConfiguration
{
public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider)
public ExposingPipelineFactory(TConfiguration configuration)
{
DateTimeProvider = dateTimeProvider;
Configuration = configuration;
Pipeline = Create(new RequestData(Configuration, null), DateTimeProvider);
RequestHandler = new DistributedTransport<TConfiguration>(Configuration, this, DateTimeProvider);
Transport = new DistributedTransport<TConfiguration>(Configuration);
}

// ReSharper disable once MemberCanBePrivate.Global
public RequestPipeline Pipeline { get; }
private DateTimeProvider DateTimeProvider { get; }
private TConfiguration Configuration { get; }
public ITransport<TConfiguration> RequestHandler { get; }
public ITransport<TConfiguration> Transport { get; }

public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) =>
new DefaultRequestPipeline(requestData, DateTimeProvider);
public override RequestPipeline Create(RequestData requestData) =>
new RequestPipeline(requestData);
}
#nullable restore
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat
private TransportConfigurationDescriptor CreateSettings() =>
new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration);


/// <summary> Create the cluster using all defaults on <see cref="TransportConfigurationDescriptor"/> </summary>
public VirtualizedCluster AllDefaults() =>
new(_dateTimeProvider, CreateSettings());
new(CreateSettings());

/// <summary> Create the cluster using <paramref name="selector"/> to provide configuration changes </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualizedCluster Settings(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector) =>
new(_dateTimeProvider, selector(CreateSettings()));
new(selector(CreateSettings()));

/// <summary>
/// Allows you to create an instance of `<see cref="VirtualClusterConnection"/> using the DSL provided by <see cref="Virtual"/>
/// </summary>
/// <param name="selector">Provide custom configuration options</param>
public VirtualClusterRequestInvoker VirtualClusterConnection(Func<TransportConfigurationDescriptor, TransportConfigurationDescriptor> selector = null) =>
new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings()))
new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings()))
.Connection;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable<Node> nodes, MockProductRegistration produc
InternalNodes = nodes.ToList();
}

public List<IClientCallRule> ClientCallRules { get; } = new List<IClientCallRule>();
public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider();
public List<IClientCallRule> ClientCallRules { get; } = new();
private TestableDateTimeProvider TestDateTimeProvider { get; } = new();

protected List<Node> InternalNodes { get; }
public IReadOnlyList<Node> Nodes => InternalNodes;
public List<IRule> PingingRules { get; } = new List<IRule>();
public List<IRule> PingingRules { get; } = new();

public List<ISniffRule> SniffingRules { get; } = new List<ISniffRule>();
public List<ISniffRule> SniffingRules { get; } = new();
internal string PublishAddressOverride { get; private set; }

internal bool SniffShouldReturnFqnd { get; private set; }
Expand Down Expand Up @@ -73,32 +73,34 @@ public VirtualCluster ClientCalls(Func<ClientCallRule, IClientCallRule> selector
public SealedVirtualCluster SingleNodeConnection(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StaticNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
var dateTimeProvider = TestDateTimeProvider;
var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider };
return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster SniffingNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickyNodePool(Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration);
}

public SealedVirtualCluster StickySniffingNodePool(Func<Node, float> sorter = null,
Func<IList<Node>, IEnumerable<Node>> seedNodesSelector = null
)
{
var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes;
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration);
return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse;

private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/");

internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings)
internal VirtualizedCluster(TransportConfigurationDescriptor settings)
{
_dateTimeProvider = dateTimeProvider;
_settings = settings;
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings, _dateTimeProvider);
_dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider
?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider));
_exposingRequestPipeline = new ExposingPipelineFactory<ITransportConfiguration>(settings);

_syncCall = (t, r) => t.Request<VirtualResponse>(
path: RootPath,
Expand All @@ -50,7 +51,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport

public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.RequestInvoker as VirtualClusterRequestInvoker;
public NodePool ConnectionPool => RequestHandler.Configuration.NodePool;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.RequestHandler;
public ITransport<ITransportConfiguration> RequestHandler => _exposingRequestPipeline?.Transport;

public VirtualizedCluster TransportProxiesTo(
Func<ITransport<ITransportConfiguration>, Func<RequestConfigurationDescriptor, IRequestConfiguration>, TransportResponse> sync,
Expand Down
8 changes: 2 additions & 6 deletions src/Elastic.Transport/Components/NodePool/CloudNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool
/// <para> Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html</para>
/// </param>
/// <param name="credentials"></param>
/// <param name="dateTimeProvider">Optionally inject an instance of <see cref="DateTimeProvider"/> used to set <see cref="NodePool.LastUpdate"/></param>
public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) =>
public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) =>
AuthenticationHeader = credentials;

private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) =>
private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) =>
ClusterName = parsedCloudId.Name;

//TODO implement debugger display for NodePool implementations and display it there and its ToString()
Expand Down Expand Up @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId)

return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}"));
}

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
14 changes: 6 additions & 8 deletions src/Elastic.Transport/Components/NodePool/NodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable
{
private bool _disposed;

internal NodePool() { }

/// <summary>
/// The last time that this instance was updated.
/// </summary>
public abstract DateTimeOffset LastUpdate { get; protected set; }
public abstract DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc cref="DateTimeProvider"/>>
public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default;

/// <summary>
/// Returns the default maximum retries for the connection pool implementation.
Expand Down Expand Up @@ -82,18 +83,15 @@ public void Dispose()
/// <param name="disposing"></param>
protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
_disposed = true;
}
if (!_disposed) _disposed = true;
}

/// <summary>
/// Creates a view over the nodes, with changing starting positions, that wraps over on each call
/// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1.
/// if there are no live nodes yields a different dead node to try once
/// </summary>
public abstract IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null);
public abstract IEnumerable<Node> CreateView(Auditor? auditor = null);

/// <summary>
/// Reseeds the nodes. The implementation is responsible for thread safety.
Expand Down
10 changes: 3 additions & 7 deletions src/Elastic.Transport/Components/NodePool/SingleNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ namespace Elastic.Transport;
public class SingleNodePool : NodePool
{
/// <inheritdoc cref="SingleNodePool"/>
public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public SingleNodePool(Uri uri)
{
var node = new Node(uri);
UsingSsl = node.Uri.Scheme == "https";
Nodes = new List<Node> { node };
LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now();
}

/// <inheritdoc />
public override DateTimeOffset LastUpdate { get; protected set; }
public override DateTimeOffset? LastUpdate { get; protected set; }

/// <inheritdoc />
public override int MaxRetries => 0;
Expand All @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null)
public override bool UsingSsl { get; protected set; }

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null) => Nodes;
public override IEnumerable<Node> CreateView(Auditor? auditor) => Nodes;

/// <inheritdoc />
public override void Reseed(IEnumerable<Node> nodes) { } //ignored

/// <inheritdoc />
protected override void Dispose(bool disposing) => base.Dispose(disposing);
}
13 changes: 5 additions & 8 deletions src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool
private readonly ReaderWriterLockSlim _readerWriter = new();

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(uris, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Uri> uris, bool randomize = true) : base(uris, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null)
: base(nodes, randomize, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, bool randomize = true) : base(nodes, randomize) { }

/// <inheritdoc cref="SniffingNodePool"/>>
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer, DateTimeProvider dateTimeProvider = null)
: base(nodes, nodeScorer, dateTimeProvider) { }
public SniffingNodePool(IEnumerable<Node> nodes, Func<Node, float> nodeScorer) : base(nodes, nodeScorer) { }

/// <inheritdoc />
public override IReadOnlyCollection<Node> Nodes
Expand Down Expand Up @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable<Node> nodes)
}

/// <inheritdoc />
public override IEnumerable<Node> CreateView(Action<AuditEvent, Node> audit = null)
public override IEnumerable<Node> CreateView(Auditor? auditor)
{
_readerWriter.EnterReadLock();
try
{
return base.CreateView(audit);
return base.CreateView(auditor);
}
finally
{
Expand Down
Loading

0 comments on commit 3352233

Please sign in to comment.