diff --git a/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs b/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs index eb996fd..5b594cb 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Audit/Auditor.cs @@ -34,7 +34,7 @@ private Auditor(Components.VirtualizedCluster cluster, Components.VirtualizedClu public IEnumerable AsyncAuditTrail { get; set; } public IEnumerable AuditTrail { get; set; } - public Func Cluster { get; set; } + public Func Cluster { get; } public TransportResponse Response { get; internal set; } public TransportResponse ResponseAsync { get; internal set; } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs index 05a746d..0622fa8 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/ExposingPipelineFactory.cs @@ -11,21 +11,16 @@ namespace Elastic.Transport.VirtualizedCluster.Components; public sealed class ExposingPipelineFactory : RequestPipelineFactory where TConfiguration : class, ITransportConfiguration { - public ExposingPipelineFactory(TConfiguration configuration, DateTimeProvider dateTimeProvider) + public ExposingPipelineFactory(TConfiguration configuration) { - DateTimeProvider = dateTimeProvider; Configuration = configuration; - Pipeline = Create(new RequestData(Configuration, null), DateTimeProvider); - RequestHandler = new DistributedTransport(Configuration, this, DateTimeProvider); + Transport = new DistributedTransport(Configuration); } - // ReSharper disable once MemberCanBePrivate.Global - public RequestPipeline Pipeline { get; } - private DateTimeProvider DateTimeProvider { get; } private TConfiguration Configuration { get; } - public ITransport RequestHandler { get; } + public ITransport Transport { get; } - public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) => - new DefaultRequestPipeline(requestData, DateTimeProvider); + public override RequestPipeline Create(RequestData requestData) => + new RequestPipeline(requestData); } #nullable restore diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs index f2b1c1b..bbe7651 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/SealedVirtualCluster.cs @@ -31,20 +31,21 @@ internal SealedVirtualCluster(VirtualCluster cluster, NodePool pool, TestableDat private TransportConfigurationDescriptor CreateSettings() => new(_nodePool, _requestInvoker, serializer: null, _productRegistration.ProductRegistration); + /// Create the cluster using all defaults on public VirtualizedCluster AllDefaults() => - new(_dateTimeProvider, CreateSettings()); + new(CreateSettings()); /// Create the cluster using to provide configuration changes /// Provide custom configuration options public VirtualizedCluster Settings(Func selector) => - new(_dateTimeProvider, selector(CreateSettings())); + new(selector(CreateSettings())); /// /// Allows you to create an instance of ` using the DSL provided by /// /// Provide custom configuration options public VirtualClusterRequestInvoker VirtualClusterConnection(Func selector = null) => - new VirtualizedCluster(_dateTimeProvider, selector == null ? CreateSettings() : selector(CreateSettings())) + new VirtualizedCluster(selector == null ? CreateSettings() : selector(CreateSettings())) .Connection; } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs index 9f0ac77..4c1025a 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualCluster.cs @@ -19,14 +19,14 @@ protected VirtualCluster(IEnumerable nodes, MockProductRegistration produc InternalNodes = nodes.ToList(); } - public List ClientCallRules { get; } = new List(); - public TestableDateTimeProvider DateTimeProvider { get; } = new TestableDateTimeProvider(); + public List ClientCallRules { get; } = new(); + private TestableDateTimeProvider TestDateTimeProvider { get; } = new(); protected List InternalNodes { get; } public IReadOnlyList Nodes => InternalNodes; - public List PingingRules { get; } = new List(); + public List PingingRules { get; } = new(); - public List SniffingRules { get; } = new List(); + public List SniffingRules { get; } = new(); internal string PublishAddressOverride { get; private set; } internal bool SniffShouldReturnFqnd { get; private set; } @@ -73,25 +73,27 @@ public VirtualCluster ClientCalls(Func selector public SealedVirtualCluster SingleNodeConnection(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new SingleNodePool(nodes.First().Uri), TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StaticNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StaticNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration); + var dateTimeProvider = TestDateTimeProvider; + var nodePool = new StaticNodePool(nodes, false) { DateTimeProvider = dateTimeProvider }; + return new SealedVirtualCluster(this, nodePool , TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster SniffingNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new SniffingNodePool(nodes, false) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StickyNodePool(Func, IEnumerable> seedNodesSelector = null) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StickyNodePool(nodes, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new StickyNodePool(nodes) { DateTimeProvider = TestDateTimeProvider}, TestDateTimeProvider, ProductRegistration); } public SealedVirtualCluster StickySniffingNodePool(Func sorter = null, @@ -99,6 +101,6 @@ public SealedVirtualCluster StickySniffingNodePool(Func sorter = nu ) { var nodes = seedNodesSelector?.Invoke(InternalNodes) ?? InternalNodes; - return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter, DateTimeProvider), DateTimeProvider, ProductRegistration); + return new SealedVirtualCluster(this, new StickySniffingNodePool(nodes, sorter) { DateTimeProvider = TestDateTimeProvider }, TestDateTimeProvider, ProductRegistration); } } diff --git a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs index 20074fe..6d8d5ff 100644 --- a/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs +++ b/src/Elastic.Transport.VirtualizedCluster/Components/VirtualizedCluster.cs @@ -22,11 +22,12 @@ private class VirtualResponse : TransportResponse; private static readonly EndpointPath RootPath = new(HttpMethod.GET, "/"); - internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, TransportConfigurationDescriptor settings) + internal VirtualizedCluster(TransportConfigurationDescriptor settings) { - _dateTimeProvider = dateTimeProvider; _settings = settings; - _exposingRequestPipeline = new ExposingPipelineFactory(settings, _dateTimeProvider); + _dateTimeProvider = ((ITransportConfiguration)_settings).DateTimeProvider as TestableDateTimeProvider + ?? throw new ArgumentException("DateTime provider is not a TestableDateTimeProvider", nameof(_dateTimeProvider)); + _exposingRequestPipeline = new ExposingPipelineFactory(settings); _syncCall = (t, r) => t.Request( path: RootPath, @@ -50,7 +51,7 @@ internal VirtualizedCluster(TestableDateTimeProvider dateTimeProvider, Transport public VirtualClusterRequestInvoker Connection => RequestHandler.Configuration.RequestInvoker as VirtualClusterRequestInvoker; public NodePool ConnectionPool => RequestHandler.Configuration.NodePool; - public ITransport RequestHandler => _exposingRequestPipeline?.RequestHandler; + public ITransport RequestHandler => _exposingRequestPipeline?.Transport; public VirtualizedCluster TransportProxiesTo( Func, Func, TransportResponse> sync, diff --git a/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs b/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs index 54282d9..15cff9c 100644 --- a/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/CloudNodePool.cs @@ -36,11 +36,10 @@ public sealed class CloudNodePool : SingleNodePool /// Read more here: https://www.elastic.co/guide/en/cloud/current/ec-cloud-id.html /// /// - /// Optionally inject an instance of used to set - public CloudNodePool(string cloudId, AuthorizationHeader credentials, DateTimeProvider dateTimeProvider = null) : this(ParseCloudId(cloudId), dateTimeProvider) => + public CloudNodePool(string cloudId, AuthorizationHeader credentials) : this(ParseCloudId(cloudId)) => AuthenticationHeader = credentials; - private CloudNodePool(ParsedCloudId parsedCloudId, DateTimeProvider dateTimeProvider = null) : base(parsedCloudId.Uri, dateTimeProvider) => + private CloudNodePool(ParsedCloudId parsedCloudId) : base(parsedCloudId.Uri) => ClusterName = parsedCloudId.Name; //TODO implement debugger display for NodePool implementations and display it there and its ToString() @@ -92,7 +91,4 @@ private static ParsedCloudId ParseCloudId(string cloudId) return new ParsedCloudId(clusterName, new Uri($"https://{elasticsearchUuid}.{domainName}")); } - - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/NodePool.cs b/src/Elastic.Transport/Components/NodePool/NodePool.cs index 97bcc07..592ec5d 100644 --- a/src/Elastic.Transport/Components/NodePool/NodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/NodePool.cs @@ -20,12 +20,13 @@ public abstract class NodePool : IDisposable { private bool _disposed; - internal NodePool() { } - /// /// The last time that this instance was updated. /// - public abstract DateTimeOffset LastUpdate { get; protected set; } + public abstract DateTimeOffset? LastUpdate { get; protected set; } + + /// > + public DateTimeProvider DateTimeProvider { get; set; } = DefaultDateTimeProvider.Default; /// /// Returns the default maximum retries for the connection pool implementation. @@ -82,10 +83,7 @@ public void Dispose() /// protected virtual void Dispose(bool disposing) { - if (!_disposed) - { - _disposed = true; - } + if (!_disposed) _disposed = true; } /// @@ -93,7 +91,7 @@ protected virtual void Dispose(bool disposing) /// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1. /// if there are no live nodes yields a different dead node to try once /// - public abstract IEnumerable CreateView(Action audit = null); + public abstract IEnumerable CreateView(Auditor? auditor = null); /// /// Reseeds the nodes. The implementation is responsible for thread safety. diff --git a/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs b/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs index b1e0424..1d5462a 100644 --- a/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/SingleNodePool.cs @@ -12,16 +12,15 @@ namespace Elastic.Transport; public class SingleNodePool : NodePool { /// - public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null) + public SingleNodePool(Uri uri) { var node = new Node(uri); UsingSsl = node.Uri.Scheme == "https"; Nodes = new List { node }; - LastUpdate = (dateTimeProvider ?? DefaultDateTimeProvider.Default).Now(); } /// - public override DateTimeOffset LastUpdate { get; protected set; } + public override DateTimeOffset? LastUpdate { get; protected set; } /// public override int MaxRetries => 0; @@ -39,11 +38,8 @@ public SingleNodePool(Uri uri, DateTimeProvider dateTimeProvider = null) public override bool UsingSsl { get; protected set; } /// - public override IEnumerable CreateView(Action audit = null) => Nodes; + public override IEnumerable CreateView(Auditor? auditor) => Nodes; /// public override void Reseed(IEnumerable nodes) { } //ignored - - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs b/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs index 29ebbbb..1c996f5 100644 --- a/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/SniffingNodePool.cs @@ -22,16 +22,13 @@ public class SniffingNodePool : StaticNodePool private readonly ReaderWriterLockSlim _readerWriter = new(); /// > - public SniffingNodePool(IEnumerable uris, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : base(uris, randomize, dateTimeProvider) { } + public SniffingNodePool(IEnumerable uris, bool randomize = true) : base(uris, randomize) { } /// > - public SniffingNodePool(IEnumerable nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : base(nodes, randomize, dateTimeProvider) { } + public SniffingNodePool(IEnumerable nodes, bool randomize = true) : base(nodes, randomize) { } /// > - public SniffingNodePool(IEnumerable nodes, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(nodes, nodeScorer, dateTimeProvider) { } + public SniffingNodePool(IEnumerable nodes, Func nodeScorer) : base(nodes, nodeScorer) { } /// public override IReadOnlyCollection Nodes @@ -81,12 +78,12 @@ public override void Reseed(IEnumerable nodes) } /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { _readerWriter.EnterReadLock(); try { - return base.CreateView(audit); + return base.CreateView(auditor); } finally { diff --git a/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs b/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs index 560fbbf..95a77dc 100644 --- a/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StaticNodePool.cs @@ -28,37 +28,36 @@ public class StaticNodePool : NodePool private readonly Func _nodeScorer; /// - public StaticNodePool(IEnumerable uris, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : this(uris.Select(uri => new Node(uri)), randomize, null, dateTimeProvider) { } + public StaticNodePool(IEnumerable uris, bool randomize = true) + : this(uris.Select(uri => new Node(uri)), randomize, null) { } /// - public StaticNodePool(IEnumerable nodes, bool randomize = true, DateTimeProvider dateTimeProvider = null) - : this(nodes, randomize, null, dateTimeProvider) { } + public StaticNodePool(IEnumerable nodes, bool randomize = true) + : this(nodes, randomize, null) { } /// - protected StaticNodePool(IEnumerable nodes, bool randomize, int? randomizeSeed = null, DateTimeProvider dateTimeProvider = null) + protected StaticNodePool(IEnumerable nodes, bool randomize, int? randomizeSeed = null) { Randomize = randomize; Random = !randomize || !randomizeSeed.HasValue ? new Random() : new Random(randomizeSeed.Value); - Initialize(nodes, dateTimeProvider); + Initialize(nodes); } //this constructor is protected because nodeScorer only makes sense on subclasses that support reseeding otherwise just manually sort `nodes` before instantiating. /// - protected StaticNodePool(IEnumerable nodes, Func nodeScorer = null, DateTimeProvider dateTimeProvider = null) + protected StaticNodePool(IEnumerable nodes, Func nodeScorer = null) { _nodeScorer = nodeScorer; - Initialize(nodes, dateTimeProvider); + Initialize(nodes); } - private void Initialize(IEnumerable nodes, DateTimeProvider dateTimeProvider) + private void Initialize(IEnumerable nodes) { var nodesProvided = nodes?.ToList() ?? throw new ArgumentNullException(nameof(nodes)); nodesProvided.ThrowIfEmpty(nameof(nodes)); - DateTimeProvider = dateTimeProvider ?? Elastic.Transport.DefaultDateTimeProvider.Default; string scheme = null; foreach (var node in nodesProvided) @@ -76,11 +75,10 @@ private void Initialize(IEnumerable nodes, DateTimeProvider dateTimeProvid InternalNodes = SortNodes(nodesProvided) .DistinctByCustom(n => n.Uri) .ToList(); - LastUpdate = DateTimeProvider.Now(); } /// - public override DateTimeOffset LastUpdate { get; protected set; } + public override DateTimeOffset? LastUpdate { get; protected set; } /// public override int MaxRetries => InternalNodes.Count - 1; @@ -112,9 +110,6 @@ protected IReadOnlyList AliveNodes } } - /// > - protected DateTimeProvider DateTimeProvider { get; private set; } - /// /// The list of nodes we are operating over. This is protected so that subclasses that DO implement /// can update this list. Its up to subclasses to make this thread safe. @@ -137,7 +132,7 @@ protected IReadOnlyList AliveNodes /// e.g Thread A might get 1,2,3,4,5 and thread B will get 2,3,4,5,1. /// if there are no live nodes yields a different dead node to try once /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -146,13 +141,13 @@ public override IEnumerable CreateView(Action audit = nu if (nodes.Count == 0) { //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } var localCursor = globalCursor % nodes.Count; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) yield return aliveNode; + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } /// @@ -164,14 +159,13 @@ public override void Reseed(IEnumerable nodes) { } //ignored /// /// /// - /// Trace action to document the fact all nodes were dead and were resurrecting one at random - protected Node RetryInternalNodes(int globalCursor, Action audit = null) + /// Trace action to document the fact all nodes were dead and were resurrecting one at random + protected Node RetryInternalNodes(int globalCursor, Auditor? auditor = null) { - audit?.Invoke(AuditEvent.AllNodesDead, null); + auditor?.Emit(AuditEvent.AllNodesDead); var node = InternalNodes[globalCursor % InternalNodes.Count]; node.IsResurrected = true; - audit?.Invoke(AuditEvent.Resurrection, node); - + auditor?.Emit(AuditEvent.Resurrection, node); return node; } @@ -181,8 +175,8 @@ protected Node RetryInternalNodes(int globalCursor, Action aud /// /// The starting point into from wich to start. /// - /// Trace action to notify if a resurrection occured - protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList aliveNodes, Action audit = null) + /// Trace action to notify if a resurrection occured + protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList aliveNodes, Auditor? auditor = null) { // ReSharper disable once ForCanBeConvertedToForeach for (var attempts = 0; attempts < aliveNodes.Count; attempts++) @@ -192,7 +186,7 @@ protected static IEnumerable SelectAliveNodes(int cursor, IReadOnlyList SortNodes(IEnumerable nodes) => ? nodes.OrderByDescending(_nodeScorer) : nodes.OrderBy(n => Randomize ? Random.Next() : 1); - /// - protected override void Dispose(bool disposing) => base.Dispose(disposing); } diff --git a/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs b/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs index fab319e..8f40cf8 100644 --- a/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StickyNodePool.cs @@ -16,15 +16,13 @@ namespace Elastic.Transport; public sealed class StickyNodePool : StaticNodePool { /// - public StickyNodePool(IEnumerable uris, DateTimeProvider dateTimeProvider = null) - : base(uris, false, dateTimeProvider) { } + public StickyNodePool(IEnumerable uris) : base(uris, false) { } /// - public StickyNodePool(IEnumerable nodes, DateTimeProvider dateTimeProvider = null) - : base(nodes, false, dateTimeProvider) { } + public StickyNodePool(IEnumerable nodes) : base(nodes, false) { } /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -33,7 +31,7 @@ public override IEnumerable CreateView(Action audit = nu var globalCursor = Interlocked.Increment(ref GlobalCursor); //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } @@ -44,7 +42,7 @@ public override IEnumerable CreateView(Action audit = nu Interlocked.Exchange(ref GlobalCursor, -1); var localCursor = 0; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } diff --git a/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs b/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs index e4344f4..fd8f201 100644 --- a/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs +++ b/src/Elastic.Transport/Components/NodePool/StickySniffingNodePool.cs @@ -17,12 +17,12 @@ namespace Elastic.Transport; public sealed class StickySniffingNodePool : SniffingNodePool { /// - public StickySniffingNodePool(IEnumerable uris, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(uris.Select(uri => new Node(uri)), nodeScorer ?? DefaultNodeScore, dateTimeProvider) { } + public StickySniffingNodePool(IEnumerable uris, Func nodeScorer) + : base(uris.Select(uri => new Node(uri)), nodeScorer ?? DefaultNodeScore) { } /// - public StickySniffingNodePool(IEnumerable nodes, Func nodeScorer, DateTimeProvider dateTimeProvider = null) - : base(nodes, nodeScorer ?? DefaultNodeScore, dateTimeProvider) { } + public StickySniffingNodePool(IEnumerable nodes, Func nodeScorer) + : base(nodes, nodeScorer ?? DefaultNodeScore) { } /// public override bool SupportsPinging => true; @@ -31,7 +31,7 @@ public StickySniffingNodePool(IEnumerable nodes, Func nodeSco public override bool SupportsReseeding => true; /// - public override IEnumerable CreateView(Action audit = null) + public override IEnumerable CreateView(Auditor? auditor) { var nodes = AliveNodes; @@ -40,7 +40,7 @@ public override IEnumerable CreateView(Action audit = nu var globalCursor = Interlocked.Increment(ref GlobalCursor); //could not find a suitable node retrying on first node off globalCursor - yield return RetryInternalNodes(globalCursor, audit); + yield return RetryInternalNodes(globalCursor, auditor); yield break; } @@ -51,7 +51,7 @@ public override IEnumerable CreateView(Action audit = nu Interlocked.Exchange(ref GlobalCursor, -1); var localCursor = 0; - foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, audit)) + foreach (var aliveNode in SelectAliveNodes(localCursor, nodes, auditor)) yield return aliveNode; } diff --git a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs deleted file mode 100644 index 4b405a1..0000000 --- a/src/Elastic.Transport/Components/Pipeline/DefaultRequestPipeline.cs +++ /dev/null @@ -1,556 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; -using Elastic.Transport.Diagnostics.Auditing; -using Elastic.Transport.Extensions; -using Elastic.Transport.Products; -using static Elastic.Transport.Diagnostics.Auditing.AuditEvent; - -namespace Elastic.Transport; - -/// -public class DefaultRequestPipeline : RequestPipeline -{ - private readonly IRequestInvoker _requestInvoker; - private readonly NodePool _nodePool; - private readonly RequestData _requestData; - private readonly DateTimeProvider _dateTimeProvider; - private readonly MemoryStreamFactory _memoryStreamFactory; - private readonly Func _nodePredicate; - private readonly ProductRegistration _productRegistration; - private RequestConfiguration? _pingAndSniffRequestConfiguration; - private List? _auditTrail; - private readonly ITransportConfiguration _settings; - - /// - internal DefaultRequestPipeline(RequestData requestData, DateTimeProvider dateTimeProvider) - { - _requestData = requestData; - _settings = requestData.ConnectionSettings; - _nodePool = requestData.ConnectionSettings.NodePool; - _requestInvoker = requestData.ConnectionSettings.RequestInvoker; - _dateTimeProvider = dateTimeProvider; - _memoryStreamFactory = requestData.MemoryStreamFactory; - _productRegistration = requestData.ConnectionSettings.ProductRegistration; - _nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate; - - StartedOn = dateTimeProvider.Now(); - } - - /// - public override IEnumerable AuditTrail => _auditTrail; - - private RequestConfiguration PingAndSniffRequestConfiguration - { - // Lazily loaded when first required, since not all node pools and configurations support pinging and sniffing. - // This avoids allocating 192B per request for those which do not need to ping or sniff. - get - { - if (_pingAndSniffRequestConfiguration is not null) return _pingAndSniffRequestConfiguration; - - _pingAndSniffRequestConfiguration = new RequestConfiguration - { - PingTimeout = PingTimeout, - RequestTimeout = PingTimeout, - Authentication = _requestData.AuthenticationHeader, - EnableHttpPipelining = _requestData.HttpPipeliningEnabled, - ForceNode = _requestData.ForceNode - }; - - return _pingAndSniffRequestConfiguration; - } - } - - //TODO xmldocs -#pragma warning disable 1591 - public bool DepletedRetries => Retried >= MaxRetries + 1 || IsTakingTooLong; - - public override bool FirstPoolUsageNeedsSniffing => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup; - - public override bool IsTakingTooLong - { - get - { - var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); - var now = _dateTimeProvider.Now(); - - //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. - var margin = timeout.TotalMilliseconds / 100.0 * 98; - var marginTimeSpan = TimeSpan.FromMilliseconds(margin); - var timespanCall = now - StartedOn; - var tookToLong = timespanCall >= marginTimeSpan; - return tookToLong; - } - } - - public override int MaxRetries => _requestData.MaxRetries; - - public bool Refresh { get; private set; } - - public int Retried { get; private set; } - - public IEnumerable SniffNodes => _nodePool - .CreateView(LazyAuditable) - .ToList() - .OrderBy(n => _productRegistration.SniffOrder(n)); - - public override bool SniffsOnConnectionFailure => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault; - - public override bool SniffsOnStaleCluster => - !RequestDisabledSniff - && _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue; - - public override bool StaleClusterState - { - get - { - if (!SniffsOnStaleCluster) return false; - - // ReSharper disable once PossibleInvalidOperationException - // already checked by SniffsOnStaleCluster - var sniffLifeSpan = _settings.SniffInformationLifeSpan.Value; - - var now = _dateTimeProvider.Now(); - var lastSniff = _nodePool.LastUpdate; - - return sniffLifeSpan < now - lastSniff; - } - } - - public override DateTimeOffset StartedOn { get; } - - private TimeSpan PingTimeout => _requestData.PingTimeout; - - private bool RequestDisabledSniff => _requestData.DisableSniff; - - private TimeSpan RequestTimeout => _requestData.RequestTimeout; - - public override void AuditCancellationRequested() => Audit(CancellationRequested)?.Dispose(); - - public override void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) - { - if (response == null) - { - //make sure we copy over the error body in case we disabled direct streaming. - var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes); - var m = callDetails?.ResponseContentType ?? RequestData.DefaultContentType; - response = _requestInvoker.ResponseFactory.Create(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null); - } - - response.ApiCallDetails.AuditTrail = AuditTrail; - } - - public override TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) - => CallProductEndpointCoreAsync(false, endpoint, requestData, postData).EnsureCompleted(); - - public override Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) - => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, cancellationToken).AsTask(); - - private async ValueTask CallProductEndpointCoreAsync(bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken = default) - where TResponse : TransportResponse, new() - { - using var audit = Audit(HealthyResponse, endpoint.Node); - - if (audit is not null) - audit.PathAndQuery = endpoint.PathAndQuery; - - try - { - TResponse response; - - if (isAsync) - response = await _requestInvoker.RequestAsync(endpoint, requestData, postData, cancellationToken).ConfigureAwait(false); - else - response = _requestInvoker.Request(endpoint, requestData, postData); - - response.ApiCallDetails.AuditTrail = AuditTrail; - - ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response); - - if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null) - { - var @event = response.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; - audit.Event = @event; - } - - return response; - } - catch (Exception e) when (audit is not null) - { - var @event = e is TransportException t && t.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; - audit.Event = @event; - audit.Exception = e; - throw; - } - } - - public override TransportException? CreateClientException( - TResponse response, - ApiCallDetails? callDetails, - Endpoint endpoint, - RequestData data, - List? seenExceptions - ) - { - if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null; - - var pipelineFailure = callDetails?.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - var innerException = callDetails?.OriginalException; - if (seenExceptions is not null && seenExceptions.HasAny(out var exs)) - { - pipelineFailure = exs.Last().FailureReason; - innerException = exs.AsAggregateOrFirst(); - } - - var statusCode = callDetails?.HttpStatusCode != null ? callDetails.HttpStatusCode.Value.ToString() : "unknown"; - var resource = callDetails == null - ? "unknown resource" - : $"Status code {statusCode} from: {callDetails.HttpMethod} {callDetails.Uri.PathAndQuery}"; - - var exceptionMessage = innerException?.Message ?? "Request failed to execute"; - - if (IsTakingTooLong) - { - pipelineFailure = PipelineFailure.MaxTimeoutReached; - Audit(MaxTimeoutReached); - exceptionMessage = "Maximum timeout reached while retrying request"; - } - else if (Retried >= MaxRetries && MaxRetries > 0) - { - pipelineFailure = PipelineFailure.MaxRetriesReached; - Audit(MaxRetriesReached); - exceptionMessage = "Maximum number of retries reached"; - - var now = _dateTimeProvider.Now(); - var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); - if (Retried >= activeNodes) - { - Audit(FailedOverAllNodes); - exceptionMessage += ", failed over to all the known alive nodes before failing"; - } - } - - exceptionMessage += !exceptionMessage.EndsWith(".", StringComparison.Ordinal) ? $". Call: {resource}" : $" Call: {resource}"; - if (response != null && _productRegistration.TryGetServerErrorReason(response, out var reason)) - exceptionMessage += $". ServerError: {reason}"; - - var clientException = new TransportException(pipelineFailure, exceptionMessage, innerException) - { - Endpoint = endpoint, - ApiCallDetails = callDetails, - AuditTrail = AuditTrail - }; - - return clientException; - } - - public override void FirstPoolUsage(SemaphoreSlim semaphore) - { - if (!FirstPoolUsageNeedsSniffing) return; - - if (!semaphore.Wait(RequestTimeout)) - { - if (FirstPoolUsageNeedsSniffing) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); - - return; - } - - if (!FirstPoolUsageNeedsSniffing) - { - semaphore.Release(); - return; - } - - try - { - using (Audit(SniffOnStartup)) - { - Sniff(); - _nodePool.MarkAsSniffed(); - } - } - finally - { - semaphore.Release(); - } - } - - public override async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken) - { - if (!FirstPoolUsageNeedsSniffing) return; - - // TODO cancellationToken could throw here and will bubble out as OperationCancelledException - // everywhere else it would bubble out wrapped in a `UnexpectedTransportException` - var success = await semaphore.WaitAsync(RequestTimeout, cancellationToken).ConfigureAwait(false); - if (!success) - { - if (FirstPoolUsageNeedsSniffing) - throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); - - return; - } - - if (!FirstPoolUsageNeedsSniffing) - { - semaphore.Release(); - return; - } - try - { - using (Audit(SniffOnStartup)) - { - await SniffAsync(cancellationToken).ConfigureAwait(false); - _nodePool.MarkAsSniffed(); - } - } - finally - { - semaphore.Release(); - } - } - - public override void MarkAlive(Node node) => node.MarkAlive(); - - public override void MarkDead(Node node) - { - var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); - node.MarkDead(deadUntil); - Retried++; - } - - /// - public override bool TryGetSingleNode(out Node node) - { - if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count && - !_nodePool.SupportsPinging && !_nodePool.SupportsReseeding) - { - node = _nodePool.Nodes.FirstOrDefault(); - - if (node is not null && _nodePredicate(node)) return true; - } - - node = null; - return false; - } - - public override IEnumerable NextNode() - { - if (_requestData.ForceNode != null) - { - yield return new Node(_requestData.ForceNode); - - yield break; - } - - //This for loop allows to break out of the view state machine if we need to - //force a refresh (after reseeding node pool). We have a hardcoded limit of only - //allowing 100 of these refreshes per call - var refreshed = false; - for (var i = 0; i < 100; i++) - { - if (DepletedRetries) yield break; - - foreach (var node in _nodePool.CreateView(LazyAuditable)) - { - if (DepletedRetries) break; - - if (!_nodePredicate(node)) continue; - - yield return node; - - if (!Refresh) continue; - - Refresh = false; - refreshed = true; - break; - } - //unless a refresh was requested we will not iterate over more then a single view. - //keep in mind refreshes are also still bound to overall maxretry count/timeout. - if (!refreshed) break; - } - } - - public override void Ping(Node node) => PingCoreAsync(false, node).EnsureCompleted(); - - public override Task PingAsync(Node node, CancellationToken cancellationToken = default) - => PingCoreAsync(true, node, cancellationToken).AsTask(); - - public async ValueTask PingCoreAsync(bool isAsync, Node node, CancellationToken cancellationToken = default) - { - if (!_productRegistration.SupportsPing) return; - if (PingDisabled(node)) return; - - var pingEndpoint = _productRegistration.CreatePingEndpoint(node, PingAndSniffRequestConfiguration); - - using var audit = Audit(PingSuccess, node); - - if (audit is not null) - audit.PathAndQuery = pingEndpoint.PathAndQuery; - - TransportResponse response; - - try - { - if (isAsync) - response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, _requestData, cancellationToken).ConfigureAwait(false); - else - response = _productRegistration.Ping(_requestInvoker, pingEndpoint, _requestData); - - ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails); - - //ping should not silently accept bad but valid http responses - if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) - { - var pipelineFailure = response.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - throw new PipelineException(pipelineFailure, response.ApiCallDetails.OriginalException) { Response = response }; - } - } - catch (Exception e) - { - response = (e as PipelineException)?.Response; - if (audit is not null) - { - audit.Event = PingFailure; - audit.Exception = e; - } - throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response }; - } - } - - public override void Sniff() => SniffCoreAsync(false).EnsureCompleted(); - - public override Task SniffAsync(CancellationToken cancellationToken = default) - => SniffCoreAsync(true, cancellationToken).AsTask(); - - public async ValueTask SniffCoreAsync(bool isAsync, CancellationToken cancellationToken = default) - { - if (!_productRegistration.SupportsSniff) return; - - var exceptions = new List(); - - foreach (var node in SniffNodes) - { - var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings); - - //TODO remove - var requestData = new RequestData(_settings, null); - - using var audit = Audit(SniffSuccess, node); - - if (audit is not null) - audit.PathAndQuery = sniffEndpoint.PathAndQuery; - - Tuple> result; - - try - { - if (isAsync) - result = await _productRegistration - .SniffAsync(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData, cancellationToken) - .ConfigureAwait(false); - else - result = _productRegistration - .Sniff(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, requestData); - - ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails); - - //sniff should not silently accept bad but valid http responses - if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) - { - var pipelineFailure = result.Item1.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; - throw new PipelineException(pipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 }; - } - - _nodePool.Reseed(result.Item2); - Refresh = true; - - return; - } - catch (Exception e) - { - if (audit is not null) - { - audit.Event = SniffFailure; - audit.Exception = e; - } - exceptions.Add(e); - } - - throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst()); - } - } - - public override void SniffOnConnectionFailure() - { - if (!SniffsOnConnectionFailure) return; - - using (Audit(SniffOnFail)) - Sniff(); - } - - public override async Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken) - { - if (!SniffsOnConnectionFailure) return; - - using (Audit(SniffOnFail)) - await SniffAsync(cancellationToken).ConfigureAwait(false); - } - - public override void SniffOnStaleCluster() - { - if (!StaleClusterState) return; - - using (Audit(AuditEvent.SniffOnStaleCluster)) - { - Sniff(); - _nodePool.MarkAsSniffed(); - } - } - - public override async Task SniffOnStaleClusterAsync(CancellationToken cancellationToken) - { - if (!StaleClusterState) return; - - using (Audit(AuditEvent.SniffOnStaleCluster)) - { - await SniffAsync(cancellationToken).ConfigureAwait(false); - _nodePool.MarkAsSniffed(); - } - } - - public override void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions) - { - var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage, (Exception)null); - using (Audit(NoNodesAttempted)) - throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = AuditTrail }; - } - - private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected; - - private Auditable? Audit(AuditEvent type, Node node = null) => - !_settings.DisableAuditTrail ?? true ? new(type, ref _auditTrail, _dateTimeProvider, node) : null; - - private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse response = null) - { - if (details?.HttpStatusCode == 401) - throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response }; - } - - private void LazyAuditable(AuditEvent e, Node n) - { - using (new Auditable(e, ref _auditTrail, _dateTimeProvider, n)) { } - } -} -#pragma warning restore 1591 diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs index 37f92e2..ca68411 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineException.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineException.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A pipeline exception is throw when ever a known failing exit point is reached in +/// A pipeline exception is throw when ever a known failing exit point is reached in /// See for known exits points /// public class PipelineException : Exception diff --git a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs index 4312188..7252d6e 100644 --- a/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs +++ b/src/Elastic.Transport/Components/Pipeline/PipelineFailure.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport; /// -/// A failure in 's workflow that caused it to end prematurely. +/// A failure in 's workflow that caused it to end prematurely. /// public enum PipelineFailure { @@ -43,7 +43,7 @@ public enum PipelineFailure MaxRetriesReached, /// - /// An exception occurred during that could not be handled + /// An exception occurred during that could not be handled /// Unexpected, diff --git a/src/Elastic.Transport/Components/Pipeline/RequestData.cs b/src/Elastic.Transport/Components/Pipeline/RequestData.cs index 44792e7..eeb44c8 100644 --- a/src/Elastic.Transport/Components/Pipeline/RequestData.cs +++ b/src/Elastic.Transport/Components/Pipeline/RequestData.cs @@ -7,6 +7,7 @@ using System.Collections.Specialized; using System.Security.Cryptography.X509Certificates; using Elastic.Transport.Extensions; +using Elastic.Transport.Products; namespace Elastic.Transport; @@ -40,6 +41,9 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local ProxyPassword = global.ProxyPassword; DisableAutomaticProxyDetection = global.DisableAutomaticProxyDetection; UserAgent = global.UserAgent; + ResponseBuilders = global.ResponseBuilders; + ProductResponseBuilders = global.ProductRegistration.ResponseBuilders; + KeepAliveInterval = (int)(global.KeepAliveInterval?.TotalMilliseconds ?? 2000); KeepAliveTime = (int)(global.KeepAliveTime?.TotalMilliseconds ?? 2000); RunAs = local?.RunAs ?? global.RunAs; @@ -88,6 +92,12 @@ public RequestData(ITransportConfiguration global, IRequestConfiguration? local } } + /// + public IReadOnlyCollection ProductResponseBuilders { get; } + + /// + public IReadOnlyCollection ResponseBuilders { get; } + /// public MemoryStreamFactory MemoryStreamFactory { get; } /// diff --git a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs index 99343b3..cfb287c 100644 --- a/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs +++ b/src/Elastic.Transport/Components/Pipeline/RequestPipeline.cs @@ -4,129 +4,555 @@ using System; using System.Collections.Generic; +using System.IO; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Elastic.Transport.Diagnostics.Auditing; +using Elastic.Transport.Extensions; +using Elastic.Transport.Products; +using static Elastic.Transport.Diagnostics.Auditing.AuditEvent; namespace Elastic.Transport; -/// /// Models the workflow of a request to multiple nodes -/// -public abstract class RequestPipeline : IDisposable +public class RequestPipeline { - private bool _disposed; + private readonly IRequestInvoker _requestInvoker; + private readonly NodePool _nodePool; + private readonly RequestData _requestData; + private readonly DateTimeProvider _dateTimeProvider; + private readonly MemoryStreamFactory _memoryStreamFactory; + private readonly Func _nodePredicate; + private readonly ProductRegistration _productRegistration; + + private RequestConfiguration? _pingAndSniffRequestConfiguration; + //private List? _auditTrail; + private readonly ITransportConfiguration _settings; + + /// + internal RequestPipeline(RequestData requestData) + { + _requestData = requestData; + _settings = requestData.ConnectionSettings; + + _nodePool = requestData.ConnectionSettings.NodePool; + _requestInvoker = requestData.ConnectionSettings.RequestInvoker; + _dateTimeProvider = requestData.ConnectionSettings.DateTimeProvider; + _memoryStreamFactory = requestData.MemoryStreamFactory; + _productRegistration = requestData.ConnectionSettings.ProductRegistration; + _nodePredicate = requestData.ConnectionSettings.NodePredicate ?? _productRegistration.NodePredicate; + } + + + private RequestConfiguration PingAndSniffRequestConfiguration + { + // Lazily loaded when first required, since not all node pools and configurations support pinging and sniffing. + // This avoids allocating 192B per request for those which do not need to ping or sniff. + get + { + if (_pingAndSniffRequestConfiguration is not null) return _pingAndSniffRequestConfiguration; + + _pingAndSniffRequestConfiguration = new RequestConfiguration + { + PingTimeout = PingTimeout, + RequestTimeout = PingTimeout, + Authentication = _requestData.AuthenticationHeader, + EnableHttpPipelining = _requestData.HttpPipeliningEnabled, + ForceNode = _requestData.ForceNode + }; + + return _pingAndSniffRequestConfiguration; + } + } + + private bool DepletedRetries(DateTimeOffset startedOn) => Retried >= MaxRetries + 1 || IsTakingTooLong(startedOn); + + private bool FirstPoolUsageNeedsSniffing => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffsOnStartup && !_nodePool.SniffedOnStartup; + + private bool IsTakingTooLong(DateTimeOffset startedOn) + { + var timeout = _settings.MaxRetryTimeout.GetValueOrDefault(RequestTimeout); + var now = _dateTimeProvider.Now(); + + //we apply a soft margin so that if a request times out at 59 seconds when the maximum is 60 we also abort. + var margin = timeout.TotalMilliseconds / 100.0 * 98; + var marginTimeSpan = TimeSpan.FromMilliseconds(margin); + var timespanCall = now - startedOn; + var tookToLong = timespanCall >= marginTimeSpan; + return tookToLong; + } + + private int MaxRetries => _requestData.MaxRetries; + + private bool Refresh { get; set; } + + private int Retried { get; set; } + + private IEnumerable SniffNodes(Auditor? auditor) => _nodePool + .CreateView(auditor) + .ToList() + .OrderBy(n => _productRegistration.SniffOrder(n)); - internal RequestPipeline() { } + private bool SniffsOnConnectionFailure => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffsOnConnectionFault; - /// - /// An audit trail that can be used for logging and debugging purposes. Giving insights into how - /// the request made its way through the workflow - /// - public abstract IEnumerable AuditTrail { get; } + private bool SniffsOnStaleCluster => + !RequestDisabledSniff + && _nodePool.SupportsReseeding && _settings.SniffInformationLifeSpan.HasValue; - /// - /// Should the workflow attempt the initial sniff as requested by - /// - /// - public abstract bool FirstPoolUsageNeedsSniffing { get; } + private bool StaleClusterState + { + get + { + if (!SniffsOnStaleCluster) return false; - //TODO xmldocs -#pragma warning disable 1591 - public abstract bool IsTakingTooLong { get; } + // ReSharper disable once PossibleInvalidOperationException + // already checked by SniffsOnStaleCluster + var sniffLifeSpan = _settings.SniffInformationLifeSpan.Value; - public abstract int MaxRetries { get; } + var now = _dateTimeProvider.Now(); + var lastSniff = _nodePool.LastUpdate; - public abstract bool SniffsOnConnectionFailure { get; } + return sniffLifeSpan < now - lastSniff; + } + } - public abstract bool SniffsOnStaleCluster { get; } + private TimeSpan PingTimeout => _requestData.PingTimeout; - public abstract bool StaleClusterState { get; } + private bool RequestDisabledSniff => _requestData.DisableSniff; - public abstract DateTimeOffset StartedOn { get; } + private TimeSpan RequestTimeout => _requestData.RequestTimeout; - public abstract TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData) - where TResponse : TransportResponse, new(); + /// Emit event + public void AuditCancellationRequested(Auditor? auditor) => auditor?.Emit(CancellationRequested); - public abstract Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, CancellationToken cancellationToken) - where TResponse : TransportResponse, new(); + /// Ensures a response is returned with + public void BadResponse( + ref TResponse? response, + ApiCallDetails? callDetails, + Endpoint endpoint, + RequestData data, + PostData? postData, + TransportException exception, + IReadOnlyCollection? auditTrail + ) + where TResponse : TransportResponse, new() + { + if (response == null) + { + //make sure we copy over the error body in case we disabled direct streaming. + var s = callDetails?.ResponseBodyInBytes == null ? Stream.Null : _memoryStreamFactory.Create(callDetails.ResponseBodyInBytes); + var m = callDetails?.ResponseContentType ?? RequestData.DefaultContentType; + response = _requestInvoker.ResponseFactory.Create(endpoint, data, postData, exception, callDetails?.HttpStatusCode, null, s, m, callDetails?.ResponseBodyInBytes?.Length ?? -1, null, null); + } - public abstract void MarkAlive(Node node); + response.ApiCallDetails.AuditTrail = auditTrail; + } - public abstract void MarkDead(Node node); + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public TResponse CallProductEndpoint(Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor) + where TResponse : TransportResponse, new() + => CallProductEndpointCoreAsync(false, endpoint, requestData, postData, auditor).EnsureCompleted(); - /// - /// Attempt to get a single node when the underlying connection pool contains only one node. - /// - /// This provides an optimised path for single node pools by avoiding an Enumerator on each call. - /// - /// - /// - /// true when a single node exists which has been set on the . - public abstract bool TryGetSingleNode(out Node node); + /// Call the product's API endpoint ensuring rich enough exceptions are thrown + public Task CallProductEndpointAsync(Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor, CancellationToken cancellationToken = default) + where TResponse : TransportResponse, new() + => CallProductEndpointCoreAsync(true, endpoint, requestData, postData, auditor, cancellationToken).AsTask(); - public abstract IEnumerable NextNode(); + private async ValueTask CallProductEndpointCoreAsync( + bool isAsync, Endpoint endpoint, RequestData requestData, PostData? postData, Auditor? auditor, CancellationToken cancellationToken = default) + where TResponse : TransportResponse, new() + { + using var audit = auditor?.Add(HealthyResponse, _dateTimeProvider, endpoint.Node); - public abstract void Ping(Node node); + try + { + TResponse response; - public abstract Task PingAsync(Node node, CancellationToken cancellationToken); + if (isAsync) + response = await _requestInvoker.RequestAsync(endpoint, requestData, postData, cancellationToken).ConfigureAwait(false); + else + response = _requestInvoker.Request(endpoint, requestData, postData); - public abstract void FirstPoolUsage(SemaphoreSlim semaphore); + response.ApiCallDetails.AuditTrail = auditor; - public abstract Task FirstPoolUsageAsync(SemaphoreSlim semaphore, CancellationToken cancellationToken); + ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails, response); - public abstract void Sniff(); + if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType && audit is not null) + { + var @event = response.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; + audit.Event = @event; + } - public abstract Task SniffAsync(CancellationToken cancellationToken); + return response; + } + catch (Exception e) when (audit is not null) + { + var @event = e is TransportException t && t.ApiCallDetails.HttpStatusCode != null ? AuditEvent.BadResponse : BadRequest; + audit.Event = @event; + audit.Exception = e; + throw; + } + } - public abstract void SniffOnStaleCluster(); + /// Create a rich enough + public TransportException? CreateClientException( + TResponse response, + ApiCallDetails? callDetails, + Endpoint endpoint, + Auditor? auditor, + DateTimeOffset startedOn, + List? seenExceptions + ) + where TResponse : TransportResponse, new() + { + if (callDetails?.HasSuccessfulStatusCodeAndExpectedContentType ?? false) return null; - public abstract Task SniffOnStaleClusterAsync(CancellationToken cancellationToken); + var pipelineFailure = callDetails?.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + var innerException = callDetails?.OriginalException; + if (seenExceptions is not null && seenExceptions.HasAny(out var exs)) + { + pipelineFailure = exs.Last().FailureReason; + innerException = exs.AsAggregateOrFirst(); + } - public abstract void SniffOnConnectionFailure(); + var statusCode = callDetails?.HttpStatusCode != null ? callDetails.HttpStatusCode.Value.ToString() : "unknown"; + var resource = callDetails == null + ? "unknown resource" + : $"Status code {statusCode} from: {callDetails.HttpMethod} {callDetails.Uri.PathAndQuery}"; - public abstract Task SniffOnConnectionFailureAsync(CancellationToken cancellationToken); + var exceptionMessage = innerException?.Message ?? "Request failed to execute"; - public abstract void BadResponse(ref TResponse response, ApiCallDetails callDetails, Endpoint endpoint, RequestData data, PostData? postData, TransportException exception) - where TResponse : TransportResponse, new(); + if (IsTakingTooLong(startedOn)) + { + pipelineFailure = PipelineFailure.MaxTimeoutReached; + auditor?.Emit(MaxTimeoutReached); + exceptionMessage = "Maximum timeout reached while retrying request"; + } + else if (Retried >= MaxRetries && MaxRetries > 0) + { + pipelineFailure = PipelineFailure.MaxRetriesReached; + auditor?.Emit(MaxRetriesReached); + exceptionMessage = "Maximum number of retries reached"; + + var now = _dateTimeProvider.Now(); + var activeNodes = _nodePool.Nodes.Count(n => n.IsAlive || n.DeadUntil <= now); + if (Retried >= activeNodes) + { + auditor?.Emit(FailedOverAllNodes); + exceptionMessage += ", failed over to all the known alive nodes before failing"; + } + } - public abstract void ThrowNoNodesAttempted(Endpoint endpoint, List? seenExceptions); + exceptionMessage += !exceptionMessage.EndsWith(".", StringComparison.Ordinal) ? $". Call: {resource}" : $" Call: {resource}"; + if (response != null && _productRegistration.TryGetServerErrorReason(response, out var reason)) + exceptionMessage += $". ServerError: {reason}"; - public abstract void AuditCancellationRequested(); + var clientException = new TransportException(pipelineFailure, exceptionMessage, innerException) + { + Endpoint = endpoint, + ApiCallDetails = callDetails, + AuditTrail = auditor + }; - public abstract TransportException? CreateClientException(TResponse? response, ApiCallDetails? callDetails, - Endpoint endpoint, RequestData data, List? seenExceptions) - where TResponse : TransportResponse, new(); -#pragma warning restore 1591 + return clientException; + } - /// - /// - /// - public void Dispose() + /// Routine for the first call into the product, potentially sniffing to discover the network topology + public void FirstPoolUsage(SemaphoreSlim semaphore, Auditor? auditor) { - Dispose(disposing: true); - GC.SuppressFinalize(this); + if (!FirstPoolUsageNeedsSniffing) return; + + if (!semaphore.Wait(RequestTimeout)) + { + if (FirstPoolUsageNeedsSniffing) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); + + return; + } + + if (!FirstPoolUsageNeedsSniffing) + { + semaphore.Release(); + return; + } + + try + { + using (auditor?.Add(SniffOnStartup, _dateTimeProvider)) + { + Sniff(auditor); + _nodePool.MarkAsSniffed(); + } + } + finally + { + semaphore.Release(); + } } - /// - /// - /// - /// - protected virtual void Dispose(bool disposing) + /// + public async Task FirstPoolUsageAsync(SemaphoreSlim semaphore, Auditor? auditor, CancellationToken cancellationToken) { - if (_disposed) + if (!FirstPoolUsageNeedsSniffing) return; + + // TODO cancellationToken could throw here and will bubble out as OperationCancelledException + // everywhere else it would bubble out wrapped in a `UnexpectedTransportException` + var success = await semaphore.WaitAsync(RequestTimeout, cancellationToken).ConfigureAwait(false); + if (!success) + { + if (FirstPoolUsageNeedsSniffing) + throw new PipelineException(PipelineFailure.CouldNotStartSniffOnStartup, null); + return; + } - if (disposing) + if (!FirstPoolUsageNeedsSniffing) { - DisposeManagedResources(); + semaphore.Release(); + return; } + try + { + using (auditor?.Add(SniffOnStartup, _dateTimeProvider)) + { + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + _nodePool.MarkAsSniffed(); + } + } + finally + { + semaphore.Release(); + } + } - _disposed = true; + /// Mark as alive putting it back in rotation. + public void MarkAlive(Node node) => node.MarkAlive(); + + /// Mark as dead, taking it out of rotation. + public void MarkDead(Node node) + { + var deadUntil = _dateTimeProvider.DeadTime(node.FailedAttempts, _settings.DeadTimeout, _settings.MaxDeadTimeout); + node.MarkDead(deadUntil); + Retried++; } - /// - /// - /// - protected virtual void DisposeManagedResources() { } + /// Fast path for if only a single node could ever be yielded this save an IEnumerator allocation + public bool TryGetSingleNode(out Node? node) + { + if (_nodePool.Nodes.Count <= 1 && _nodePool.MaxRetries <= _nodePool.Nodes.Count && + !_nodePool.SupportsPinging && !_nodePool.SupportsReseeding) + { + node = _nodePool.Nodes.FirstOrDefault(); + + if (node is not null && _nodePredicate(node)) return true; + } + + node = null; + return false; + } + + /// returns a consistent enumerable view into the available nodes + public IEnumerable NextNode(DateTimeOffset startedOn, Auditor? auditor) + { + if (_requestData.ForceNode != null) + { + yield return new Node(_requestData.ForceNode); + + yield break; + } + + //This for loop allows to break out of the view state machine if we need to + //force a refresh (after reseeding node pool). We have a hardcoded limit of only + //allowing 100 of these refreshes per call + var refreshed = false; + for (var i = 0; i < 100; i++) + { + if (DepletedRetries(startedOn)) yield break; + + foreach (var node in _nodePool.CreateView(auditor)) + { + if (DepletedRetries(startedOn)) break; + + if (!_nodePredicate(node)) continue; + + yield return node; + + if (!Refresh) continue; + + Refresh = false; + refreshed = true; + break; + } + //unless a refresh was requested we will not iterate over more then a single view. + //keep in mind refreshes are also still bound to overall maxretry count/timeout. + if (!refreshed) break; + } + } + + /// ping as a fast path ensuring its alive + public void Ping(Node node, Auditor? auditor) => PingCoreAsync(false, node, auditor).EnsureCompleted(); + + /// ping as a fast path ensuring its alive + public Task PingAsync(Node node, Auditor? auditor, CancellationToken cancellationToken = default) + => PingCoreAsync(true, node, auditor, cancellationToken).AsTask(); + + private async ValueTask PingCoreAsync(bool isAsync, Node node, Auditor? auditor, CancellationToken cancellationToken = default) + { + if (!_productRegistration.SupportsPing) return; + if (PingDisabled(node)) return; + + var pingEndpoint = _productRegistration.CreatePingEndpoint(node, PingAndSniffRequestConfiguration); + + using var audit = auditor?.Add(PingSuccess, _dateTimeProvider, node); + + TransportResponse response; + + try + { + if (isAsync) + response = await _productRegistration.PingAsync(_requestInvoker, pingEndpoint, _requestData, cancellationToken).ConfigureAwait(false); + else + response = _productRegistration.Ping(_requestInvoker, pingEndpoint, _requestData); + + ThrowBadAuthPipelineExceptionWhenNeeded(response.ApiCallDetails); + + //ping should not silently accept bad but valid http responses + if (!response.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) + { + var pipelineFailure = response.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + throw new PipelineException(pipelineFailure, response.ApiCallDetails.OriginalException) { Response = response }; + } + } + catch (Exception e) + { + response = (e as PipelineException)?.Response; + if (audit is not null) + { + audit.Event = PingFailure; + audit.Exception = e; + } + throw new PipelineException(PipelineFailure.PingFailure, e) { Response = response }; + } + } + + /// Discover the products network topology to yield all available nodes + public void Sniff(Auditor? auditor) => SniffCoreAsync(false, auditor).EnsureCompleted(); + + /// Discover the products network topology to yield all available nodes + public Task SniffAsync(Auditor? auditor, CancellationToken cancellationToken = default) + => SniffCoreAsync(true, auditor, cancellationToken).AsTask(); + + private async ValueTask SniffCoreAsync(bool isAsync, Auditor? auditor, CancellationToken cancellationToken = default) + { + if (!_productRegistration.SupportsSniff) return; + + var exceptions = new List(); + + foreach (var node in SniffNodes(auditor)) + { + var sniffEndpoint = _productRegistration.CreateSniffEndpoint(node, PingAndSniffRequestConfiguration, _settings); + using var audit = auditor?.Add(SniffSuccess, _dateTimeProvider, node); + + Tuple> result; + + try + { + if (isAsync) + result = await _productRegistration + .SniffAsync(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, _requestData, cancellationToken) + .ConfigureAwait(false); + else + result = _productRegistration + .Sniff(_requestInvoker, _nodePool.UsingSsl, sniffEndpoint, _requestData); + + ThrowBadAuthPipelineExceptionWhenNeeded(result.Item1.ApiCallDetails); + + //sniff should not silently accept bad but valid http responses + if (!result.Item1.ApiCallDetails.HasSuccessfulStatusCodeAndExpectedContentType) + { + var pipelineFailure = result.Item1.ApiCallDetails.HttpStatusCode != null ? PipelineFailure.BadResponse : PipelineFailure.BadRequest; + throw new PipelineException(pipelineFailure, result.Item1.ApiCallDetails.OriginalException) { Response = result.Item1 }; + } + + _nodePool.Reseed(result.Item2); + Refresh = true; + + return; + } + catch (Exception e) + { + if (audit is not null) + { + audit.Event = SniffFailure; + audit.Exception = e; + } + exceptions.Add(e); + } + + throw new PipelineException(PipelineFailure.SniffFailure, exceptions.AsAggregateOrFirst()); + } + } + + /// sniff the topology when a connection failure happens + public void SniffOnConnectionFailure(Auditor? auditor) + { + if (!SniffsOnConnectionFailure) return; + + using (auditor?.Add(SniffOnFail, _dateTimeProvider)) + Sniff(auditor); + } + + /// sniff the topology when a connection failure happens + public async Task SniffOnConnectionFailureAsync(Auditor? auditor, CancellationToken cancellationToken) + { + if (!SniffsOnConnectionFailure) return; + + using (auditor?.Add(SniffOnFail, _dateTimeProvider)) + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + } + + /// sniff the topology after a set period to ensure it's up to date + public void SniffOnStaleCluster(Auditor? auditor) + { + if (!StaleClusterState) return; + + using (auditor?.Add(AuditEvent.SniffOnStaleCluster, _dateTimeProvider)) + { + Sniff(auditor); + _nodePool.MarkAsSniffed(); + } + } + + /// sniff the topology after a set period to ensure its up to date + public async Task SniffOnStaleClusterAsync(Auditor? auditor, CancellationToken cancellationToken) + { + if (!StaleClusterState) return; + + using (auditor?.Add(AuditEvent.SniffOnStaleCluster, _dateTimeProvider)) + { + await SniffAsync(auditor, cancellationToken).ConfigureAwait(false); + _nodePool.MarkAsSniffed(); + } + } + + /// emit event in case no nodes were available + public void ThrowNoNodesAttempted(Endpoint endpoint, Auditor? auditor, List? seenExceptions) + { + var clientException = new TransportException(PipelineFailure.NoNodesAttempted, RequestPipelineStatics.NoNodesAttemptedMessage); + using (auditor?.Add(NoNodesAttempted, _dateTimeProvider)) + throw new UnexpectedTransportException(clientException, seenExceptions) { Endpoint = endpoint, AuditTrail = auditor }; + } + + private bool PingDisabled(Node node) => _requestData.DisablePings || !node.IsResurrected; + + private static void ThrowBadAuthPipelineExceptionWhenNeeded(ApiCallDetails details, TransportResponse? response = null) + { + if (details.HttpStatusCode == 401) + throw new PipelineException(PipelineFailure.BadAuthentication, details.OriginalException) { Response = response }; + } } diff --git a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs index f9be33b..399381d 100644 --- a/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs +++ b/src/Elastic.Transport/Components/Providers/DefaultRequestPipelineFactory.cs @@ -5,13 +5,14 @@ namespace Elastic.Transport; /// -/// The default implementation for that returns +/// The default implementation for that returns /// internal sealed class DefaultRequestPipelineFactory : RequestPipelineFactory { + public static readonly DefaultRequestPipelineFactory Default = new (); /// - /// returns instances of + /// returns instances of /// - public override RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider) => - new DefaultRequestPipeline(requestData, dateTimeProvider); + public override RequestPipeline Create(RequestData requestData) => + new RequestPipeline(requestData); } diff --git a/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs b/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs index ff4ef38..ee0d6ff 100644 --- a/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs +++ b/src/Elastic.Transport/Components/Providers/RequestPipelineFactory.cs @@ -10,5 +10,5 @@ public abstract class RequestPipelineFactory internal RequestPipelineFactory() { } /// Create an instance of - public abstract RequestPipeline Create(RequestData requestData, DateTimeProvider dateTimeProvider); + public abstract RequestPipeline Create(RequestData requestData); } diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker-FullFramework.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker-FullFramework.cs index 68c7d05..89e316d 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker-FullFramework.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker-FullFramework.cs @@ -13,13 +13,7 @@ public class HttpRequestInvoker : HttpWebRequestInvoker /// /// Create a new instance of the . /// - public HttpRequestInvoker() : base() { } - - /// - /// Create a new instance of the . - /// - /// The from which response builders can be loaded. - public HttpRequestInvoker(ITransportConfiguration transportConfiguration) : base(transportConfiguration) { } + public HttpRequestInvoker() { } /// The default TransportClient implementation. Uses on the current .NET desktop framework. internal HttpRequestInvoker(ResponseFactory responseFactory) : base(responseFactory) { } diff --git a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs index c6ec90c..afb6023 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpRequestInvoker.cs @@ -37,14 +37,7 @@ public class HttpRequestInvoker : IRequestInvoker /// /// Create a new instance of the . /// - public HttpRequestInvoker() : this(new TransportConfiguration()) { } - - /// - /// Create a new instance of the . - /// - /// The from which response builders can be loaded. - public HttpRequestInvoker(ITransportConfiguration transportConfiguration) : - this(new DefaultResponseFactory(transportConfiguration)) { } + public HttpRequestInvoker() : this(new DefaultResponseFactory()) { } internal HttpRequestInvoker(ResponseFactory responseFactory) { @@ -56,13 +49,13 @@ internal HttpRequestInvoker(ResponseFactory responseFactory) /// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation. /// public HttpRequestInvoker(Func wrappingHandler) : - this(wrappingHandler, new DefaultResponseFactory(new TransportConfiguration())) { } + this(wrappingHandler, new DefaultResponseFactory()) { } /// /// Allows consumers to inject their own HttpMessageHandler, and optionally call our default implementation. /// public HttpRequestInvoker(Func wrappingHandler, ITransportConfiguration transportConfiguration) : - this(wrappingHandler, new DefaultResponseFactory(transportConfiguration)) + this(wrappingHandler, new DefaultResponseFactory()) { } internal HttpRequestInvoker(Func wrappingHandler, ResponseFactory responseFactory) diff --git a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs index 98eb2ff..dd97e3c 100644 --- a/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/HttpWebRequestInvoker.cs @@ -45,15 +45,7 @@ static HttpWebRequestInvoker() /// /// Create a new instance of the . /// - public HttpWebRequestInvoker() : this(new TransportConfiguration()) { } - - /// - /// Create a new instance of the . - /// - /// The from which response builders can be loaded. - public HttpWebRequestInvoker(ITransportConfiguration transportConfiguration) : - this(new DefaultResponseFactory(transportConfiguration)) - { } + public HttpWebRequestInvoker() : this(new DefaultResponseFactory()) { } internal HttpWebRequestInvoker(ResponseFactory responseFactory) => ResponseFactory = responseFactory; diff --git a/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs b/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs index c487d35..090fbc0 100644 --- a/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs +++ b/src/Elastic.Transport/Components/TransportClient/InMemoryRequestInvoker.cs @@ -29,15 +29,11 @@ public class InMemoryRequestInvoker : IRequestInvoker /// Every request will succeed with this overload, note that it won't actually return mocked responses /// so using this overload might fail if you are using it to test high level bits that need to deserialize the response. /// - public InMemoryRequestInvoker() : this(null) { } - - /// - public InMemoryRequestInvoker(ProductRegistration? productRegistration) + public InMemoryRequestInvoker() { _statusCode = 200; - productRegistration ??= DefaultProductRegistration.Default; - ResponseFactory = new DefaultResponseFactory(new TransportConfiguration(null, productRegistration)); + ResponseFactory = new DefaultResponseFactory(); } /// @@ -49,7 +45,7 @@ public InMemoryRequestInvoker(byte[] responseBody, int statusCode = 200, Excepti _contentType = contentType; _headers = headers; - ResponseFactory = new DefaultResponseFactory(new TransportConfiguration(null, DefaultProductRegistration.Default)); + ResponseFactory = new DefaultResponseFactory(); } /// diff --git a/src/Elastic.Transport/Configuration/HeadersList.cs b/src/Elastic.Transport/Configuration/HeadersList.cs index a08a889..a01905d 100644 --- a/src/Elastic.Transport/Configuration/HeadersList.cs +++ b/src/Elastic.Transport/Configuration/HeadersList.cs @@ -6,6 +6,7 @@ using System.Collections; using System.Collections.Generic; using System.Linq; +using Elastic.Transport.Extensions; namespace Elastic.Transport; @@ -89,18 +90,4 @@ private void AddToHeaders(HeadersList? headers) IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); - internal struct EmptyEnumerator : IEnumerator - { - public T Current => default; - object IEnumerator.Current => Current; - public bool MoveNext() => false; - - public void Reset() - { - } - - public void Dispose() - { - } - } } diff --git a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs index a35e21e..4cc35cc 100644 --- a/src/Elastic.Transport/Configuration/ITransportConfiguration.cs +++ b/src/Elastic.Transport/Configuration/ITransportConfiguration.cs @@ -48,6 +48,13 @@ public interface ITransportConfiguration : IRequestConfiguration, IDisposable /// ProductRegistration ProductRegistration { get; } + /// Allows you to wrap calls to , mainly for testing purposes to not have to rely + /// on the wall clock + DateTimeProvider DateTimeProvider { get; } + + /// In charge of create a new + RequestPipelineFactory PipelineProvider { get; } + /// /// The time to put dead nodes out of rotation (this will be multiplied by the number of times they've been dead) /// diff --git a/src/Elastic.Transport/Configuration/TransportConfiguration.cs b/src/Elastic.Transport/Configuration/TransportConfiguration.cs index 9ebd657..4d0c9a2 100644 --- a/src/Elastic.Transport/Configuration/TransportConfiguration.cs +++ b/src/Elastic.Transport/Configuration/TransportConfiguration.cs @@ -42,7 +42,7 @@ public record TransportConfiguration : ITransportConfiguration #pragma warning disable 1570 /// /// The default concurrent connection limit for outgoing http requests. Defaults to 80 -#if !NETFRAMEWORK /// Except for implementations based on curl, which defaults to +#if !NETFRAMEWORK /// Except for implementations based on curl, which defaults to #endif /// #pragma warning restore 1570 @@ -85,18 +85,23 @@ internal TransportConfiguration( { //non init properties NodePool = nodePool; - RequestInvoker = requestInvoker ?? new HttpRequestInvoker(this); + RequestInvoker = requestInvoker ?? new HttpRequestInvoker(); ProductRegistration = productRegistration ?? DefaultProductRegistration.Default; - Accept = productRegistration?.DefaultContentType; RequestResponseSerializer = serializer ?? new LowLevelRequestResponseSerializer(); + DateTimeProvider = nodePool.DateTimeProvider; + MetaHeaderProvider = productRegistration?.MetaHeaderProvider; + UrlFormatter = new UrlFormatter(this); + + PipelineProvider = DefaultRequestPipelineFactory.Default; + + Accept = productRegistration?.DefaultContentType; ConnectionLimit = DefaultConnectionLimit; DnsRefreshTimeout = DefaultDnsRefreshTimeout; MemoryStreamFactory = DefaultMemoryStreamFactory; SniffsOnConnectionFault = true; SniffsOnStartup = true; SniffInformationLifeSpan = TimeSpan.FromHours(1); - MetaHeaderProvider = productRegistration?.MetaHeaderProvider; - UrlFormatter = new UrlFormatter(this); + StatusCodeToResponseSuccess = ProductRegistration.HttpStatusCodeClassifier; UserAgent = UserAgent.Create(ProductRegistration.Name, ProductRegistration.GetType()); @@ -119,6 +124,9 @@ public TransportConfiguration(ITransportConfiguration config) throw new ArgumentNullException(nameof(config)); #endif + // it's important url formatter is repointed to the new instance of ITransportConfiguration + UrlFormatter = new UrlFormatter(this); + Accept = config.Accept; AllowedStatusCodes = config.AllowedStatusCodes; Authentication = config.Authentication; @@ -127,6 +135,7 @@ public TransportConfiguration(ITransportConfiguration config) ClientCertificates = config.ClientCertificates; ConnectionLimit = config.ConnectionLimit; ContentType = config.ContentType; + DateTimeProvider = config.DateTimeProvider; DeadTimeout = config.DeadTimeout; DisableAuditTrail = config.DisableAuditTrail; DisableAutomaticProxyDetection = config.DisableAutomaticProxyDetection; @@ -154,6 +163,7 @@ public TransportConfiguration(ITransportConfiguration config) OpaqueId = config.OpaqueId; ParseAllHeaders = config.ParseAllHeaders; PingTimeout = config.PingTimeout; + PipelineProvider = config.PipelineProvider; PrettyJson = config.PrettyJson; ProductRegistration = config.ProductRegistration; ProxyAddress = config.ProxyAddress; @@ -174,7 +184,6 @@ public TransportConfiguration(ITransportConfiguration config) StatusCodeToResponseSuccess = config.StatusCodeToResponseSuccess; ThrowExceptions = config.ThrowExceptions; TransferEncodingChunked = config.TransferEncodingChunked; - UrlFormatter = config.UrlFormatter; UserAgent = config.UserAgent; } @@ -204,6 +213,11 @@ public virtual bool DebugMode public IRequestInvoker RequestInvoker { get; } /// public Serializer RequestResponseSerializer { get; } + /// + public DateTimeProvider DateTimeProvider { get; } + + /// + public RequestPipelineFactory PipelineProvider { get; init; } /// // ReSharper disable UnusedAutoPropertyAccessor.Global diff --git a/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs b/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs index 4f361d2..f062331 100644 --- a/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs +++ b/src/Elastic.Transport/Configuration/TransportConfigurationDescriptor.cs @@ -69,22 +69,26 @@ public abstract class TransportConfigurationDescriptorBase : ITransportConfig protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoker? requestInvoker, Serializer? requestResponseSerializer, ProductRegistration? productRegistration) { _nodePool = nodePool; - _requestInvoker = requestInvoker ?? new HttpRequestInvoker(this); + _requestInvoker = requestInvoker ?? new HttpRequestInvoker(); _productRegistration = productRegistration ?? DefaultProductRegistration.Default; - _accept = productRegistration?.DefaultContentType; - _bootstrapLock = new(1, 1); _requestResponseSerializer = requestResponseSerializer ?? new LowLevelRequestResponseSerializer(); + _pipelineProvider = DefaultRequestPipelineFactory.Default; + _dateTimeProvider = nodePool.DateTimeProvider; + _bootstrapLock = new(1, 1); + _metaHeaderProvider = productRegistration?.MetaHeaderProvider; + _urlFormatter = new UrlFormatter(this); + + _accept = productRegistration?.DefaultContentType; _connectionLimit = TransportConfiguration.DefaultConnectionLimit; _dnsRefreshTimeout = TransportConfiguration.DefaultDnsRefreshTimeout; _memoryStreamFactory = TransportConfiguration.DefaultMemoryStreamFactory; _sniffsOnConnectionFault = true; _sniffsOnStartup = true; _sniffInformationLifeSpan = TimeSpan.FromHours(1); - _metaHeaderProvider = productRegistration?.MetaHeaderProvider; - _urlFormatter = new UrlFormatter(this); + _statusCodeToResponseSuccess = _productRegistration.HttpStatusCodeClassifier; _userAgent = Transport.UserAgent.Create(_productRegistration.Name, _productRegistration.GetType()); - + if (nodePool is CloudNodePool cloudPool) { _authentication = cloudPool.AuthenticationHeader; @@ -157,6 +161,8 @@ protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoke private readonly MetaHeaderProvider? _metaHeaderProvider; private HeadersList? _responseHeadersToParse; private bool? _parseAllHeaders; + private DateTimeProvider _dateTimeProvider; + private RequestPipelineFactory _pipelineProvider; private List? _responseBuilders; SemaphoreSlim ITransportConfiguration.BootstrapLock => _bootstrapLock; @@ -164,6 +170,10 @@ protected TransportConfigurationDescriptorBase(NodePool nodePool, IRequestInvoke int ITransportConfiguration.ConnectionLimit => _connectionLimit; NodePool ITransportConfiguration.NodePool => _nodePool; ProductRegistration ITransportConfiguration.ProductRegistration => _productRegistration; + + DateTimeProvider? ITransportConfiguration.DateTimeProvider => _dateTimeProvider; + RequestPipelineFactory? ITransportConfiguration.PipelineProvider => _pipelineProvider; + TimeSpan? ITransportConfiguration.DeadTimeout => _deadTimeout; bool ITransportConfiguration.DisableAutomaticProxyDetection => _disableAutomaticProxyDetection; TimeSpan? ITransportConfiguration.KeepAliveInterval => _keepAliveInterval; @@ -428,6 +438,9 @@ public T SkipDeserializationForStatusCodes(params int[] statusCodes) => /// public T MemoryStreamFactory(MemoryStreamFactory memoryStreamFactory) => Assign(memoryStreamFactory, static (a, v) => a._memoryStreamFactory = v); + /// > + public T PipelineProvider(RequestPipelineFactory provider) => Assign(provider, static (a, v) => a._pipelineProvider = v); + /// > public T EnableTcpStats(bool enableTcpStats = true) => Assign(enableTcpStats, static (a, v) => a._enableTcpStats = v); diff --git a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs index f9dc741..874f1d6 100644 --- a/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/AuditDiagnosticObserver.cs @@ -8,13 +8,13 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to events that emits +/// Provides a typed listener to events that emits internal sealed class AuditDiagnosticObserver : TypedDiagnosticObserver { /// public AuditDiagnosticObserver( Action> onNext, - Action onError = null, - Action onCompleted = null + Action? onError = null, + Action? onCompleted = null ) : base(onNext, onError, onCompleted) { } } diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs index 53954ff..4c12482 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Audit.cs @@ -24,12 +24,7 @@ internal Audit(AuditEvent type, DateTimeOffset started) /// /// The node on which the request was made. /// - public Node Node { get; internal set; } - - /// - /// The path of the request. - /// - public string PathAndQuery { get; internal set; } + public Node? Node { get; internal init; } /// /// The end date and time of the audit. @@ -55,6 +50,8 @@ public override string ToString() var tookString = string.Empty; if (took >= TimeSpan.Zero) tookString = $" Took: {took}"; - return Node == null ? $"Event: {Event.GetStringValue()}{tookString}" : $"Event: {Event.GetStringValue()} Node: {Node?.Uri} NodeAlive: {Node?.IsAlive}Took: {tookString}"; + return Node == null + ? $"Event: {Event.GetStringValue()}{tookString}" + : $"Event: {Event.GetStringValue()} Node: {Node?.Uri} NodeAlive: {Node?.IsAlive}Took: {tookString}"; } } diff --git a/src/Elastic.Transport/Diagnostics/Auditing/AuditEventExtensions.cs b/src/Elastic.Transport/Diagnostics/Auditing/AuditEventExtensions.cs deleted file mode 100644 index 43bae54..0000000 --- a/src/Elastic.Transport/Diagnostics/Auditing/AuditEventExtensions.cs +++ /dev/null @@ -1,43 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -using System.Diagnostics; -using Elastic.Transport.Extensions; - -namespace Elastic.Transport.Diagnostics.Auditing; - -internal static class AuditEventExtensions -{ - /// - /// Returns the name of the event to be used for use in . - /// If this return null the event should not be reported on - /// This indicates this event is monitored by a different component already - /// - /// The diagnostic event name representation or null if it should go unreported - public static string GetAuditDiagnosticEventName(this AuditEvent @event) - { - switch(@event) - { - case AuditEvent.SniffFailure: - case AuditEvent.SniffSuccess: - case AuditEvent.PingFailure: - case AuditEvent.PingSuccess: - case AuditEvent.BadResponse: - case AuditEvent.HealthyResponse: - return null; - case AuditEvent.SniffOnStartup: return nameof(AuditEvent.SniffOnStartup); - case AuditEvent.SniffOnFail: return nameof(AuditEvent.SniffOnFail); - case AuditEvent.SniffOnStaleCluster: return nameof(AuditEvent.SniffOnStaleCluster); - case AuditEvent.Resurrection: return nameof(AuditEvent.Resurrection); - case AuditEvent.AllNodesDead: return nameof(AuditEvent.AllNodesDead); - case AuditEvent.MaxTimeoutReached: return nameof(AuditEvent.MaxTimeoutReached); - case AuditEvent.MaxRetriesReached: return nameof(AuditEvent.MaxRetriesReached); - case AuditEvent.BadRequest: return nameof(AuditEvent.BadRequest); - case AuditEvent.NoNodesAttempted: return nameof(AuditEvent.NoNodesAttempted); - case AuditEvent.CancellationRequested: return nameof(AuditEvent.CancellationRequested); - case AuditEvent.FailedOverAllNodes: return nameof(AuditEvent.FailedOverAllNodes); - default: return @event.GetStringValue(); //still cached but uses reflection - } - } -} diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs index a0c40f2..d002138 100644 --- a/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs +++ b/src/Elastic.Transport/Diagnostics/Auditing/Auditable.cs @@ -3,7 +3,6 @@ // See the LICENSE file in the project root for more information using System; -using System.Collections.Generic; namespace Elastic.Transport.Diagnostics.Auditing; @@ -13,36 +12,28 @@ internal class Auditable : IDisposable private readonly DateTimeProvider _dateTimeProvider; - public Auditable(AuditEvent type, ref List auditTrail, DateTimeProvider dateTimeProvider, Node node) + public Auditable(AuditEvent type, DateTimeProvider dateTimeProvider, Node? node) { - auditTrail ??= new List(); - _dateTimeProvider = dateTimeProvider; var started = _dateTimeProvider.Now(); - _audit = new Audit(type, started) { Node = node }; - - auditTrail.Add(_audit); } public AuditEvent Event { - set => _audit.Event = value; + set => Audit.Event = value; } public Exception Exception { - set => _audit.Exception = value; + set => Audit.Exception = value; } - public string PathAndQuery - { - set => _audit.PathAndQuery = value; - } + public Audit Audit => _audit; - public void Dispose() => _audit.Ended = _dateTimeProvider.Now(); + public void Dispose() => Audit.Ended = _dateTimeProvider.Now(); } diff --git a/src/Elastic.Transport/Diagnostics/Auditing/Auditor.cs b/src/Elastic.Transport/Diagnostics/Auditing/Auditor.cs new file mode 100644 index 0000000..275d258 --- /dev/null +++ b/src/Elastic.Transport/Diagnostics/Auditing/Auditor.cs @@ -0,0 +1,48 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections; +using System.Collections.Generic; +using Elastic.Transport.Extensions; + +namespace Elastic.Transport.Diagnostics.Auditing; + +/// Collects events +public class Auditor : IReadOnlyCollection +{ + private readonly DateTimeProvider _dateTimeProvider; + private List? _audits; + + internal Auditor(DateTimeProvider dateTimeProvider) => _dateTimeProvider = dateTimeProvider; + + /// + public IEnumerator GetEnumerator() => + _audits?.GetEnumerator() ?? (IEnumerator)new EmptyEnumerator(); + + IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + + internal Auditable Add(Auditable auditable) + { + _audits ??= new List(); + _audits.Add(auditable.Audit); + return auditable; + } + + internal Auditable Add(AuditEvent type, DateTimeProvider dateTimeProvider, Node? node = null) + { + _audits ??= new List(); + var auditable = new Auditable(type, dateTimeProvider, node); + _audits.Add(auditable.Audit); + return auditable; + } + + /// Emits an event that does not need to track a duration + public void Emit(AuditEvent type) => Add(type, _dateTimeProvider).Dispose(); + + /// Emits an event that does not need to track a duration + public void Emit(AuditEvent type, Node node) => Add(type, _dateTimeProvider, node).Dispose(); + + /// + public int Count => _audits?.Count ?? 0; +} diff --git a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs index 618f1e4..257d038 100644 --- a/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs +++ b/src/Elastic.Transport/Diagnostics/DiagnosticSources.cs @@ -76,7 +76,7 @@ public class SerializerDiagnosticKeys : IDiagnosticsKeys } /// - /// Provides access to the string event names that emits + /// Provides access to the string event names that emits /// public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys { @@ -97,7 +97,7 @@ public class RequestPipelineDiagnosticKeys : IDiagnosticsKeys /// /// Reference to the diagnostic source name that allows you to listen to all decisions that - /// makes. Events it emits are the names on + /// makes. Events it emits are the names on /// public class AuditDiagnosticKeys : IDiagnosticsKeys { diff --git a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs index 77f15a1..dcac354 100644 --- a/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/RequestPipelineDiagnosticObserver.cs @@ -7,7 +7,7 @@ namespace Elastic.Transport.Diagnostics; -/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; +/// Provides a typed listener to actions that takes e.g sniff, ping, or making an API call ; internal sealed class RequestPipelineDiagnosticObserver : TypedDiagnosticObserver { /// diff --git a/src/Elastic.Transport/Diagnostics/TypedDiagnosticObserver.cs b/src/Elastic.Transport/Diagnostics/TypedDiagnosticObserver.cs index cc38b9a..0c8d1b9 100644 --- a/src/Elastic.Transport/Diagnostics/TypedDiagnosticObserver.cs +++ b/src/Elastic.Transport/Diagnostics/TypedDiagnosticObserver.cs @@ -15,17 +15,17 @@ namespace Elastic.Transport.Diagnostics; internal abstract class TypedDiagnosticObserver : IObserver> { private readonly Action> _onNext; - private readonly Action _onError; - private readonly Action _onCompleted; + private readonly Action? _onError; + private readonly Action? _onCompleted; /// protected TypedDiagnosticObserver( Action> onNext, - Action onError = null, - Action onCompleted = null + Action? onError = null, + Action? onCompleted = null ) { - _onNext= onNext; + _onNext= onNext ?? throw new ArgumentNullException(nameof(onNext)); _onError = onError; _onCompleted = onCompleted; } @@ -36,8 +36,8 @@ protected TypedDiagnosticObserver( void IObserver>.OnNext(KeyValuePair value) { - if (value.Value is TOnNext next) _onNext?.Invoke(new KeyValuePair(value.Key, next)); - else if (value.Value == null) _onNext?.Invoke(new KeyValuePair(value.Key, default)); + if (value.Value is TOnNext next) _onNext.Invoke(new KeyValuePair(value.Key, next)); + else if (value.Value == null) _onNext.Invoke(new KeyValuePair(value.Key, default)); else throw new Exception($"{value.Key} received unexpected type {value.Value.GetType()}"); @@ -49,19 +49,19 @@ public abstract class TypedDiagnosticObserver : IObser { private readonly Action> _onNextStart; private readonly Action> _onNextEnd; - private readonly Action _onError; - private readonly Action _onCompleted; + private readonly Action? _onError; + private readonly Action? _onCompleted; /// protected TypedDiagnosticObserver( Action> onNextStart, Action> onNextEnd, - Action onError = null, - Action onCompleted = null + Action? onError = null, + Action? onCompleted = null ) { - _onNextStart = onNextStart; - _onNextEnd = onNextEnd; + _onNextStart = onNextStart ?? throw new ArgumentNullException(nameof(onNextStart)); + _onNextEnd = onNextEnd ?? throw new ArgumentNullException(nameof(onNextEnd)); _onError = onError; _onCompleted = onCompleted; } @@ -72,11 +72,11 @@ protected TypedDiagnosticObserver( void IObserver>.OnNext(KeyValuePair value) { - if (value.Value is TOnNextStart nextStart) _onNextStart?.Invoke(new KeyValuePair(value.Key, nextStart)); - else if (value.Key.EndsWith(".Start") && value.Value is null) _onNextStart?.Invoke(new KeyValuePair(value.Key, default)); + if (value.Value is TOnNextStart nextStart) _onNextStart.Invoke(new KeyValuePair(value.Key, nextStart)); + else if (value.Key.EndsWith(".Start") && value.Value is null) _onNextStart.Invoke(new KeyValuePair(value.Key, default)); - else if (value.Value is TOnNextEnd nextEnd) _onNextEnd?.Invoke(new KeyValuePair(value.Key, nextEnd)); - else if (value.Key.EndsWith(".Stop") && value.Value is null) _onNextEnd?.Invoke(new KeyValuePair(value.Key, default)); + else if (value.Value is TOnNextEnd nextEnd) _onNextEnd.Invoke(new KeyValuePair(value.Key, nextEnd)); + else if (value.Key.EndsWith(".Stop") && value.Value is null) _onNextEnd.Invoke(new KeyValuePair(value.Key, default)); else throw new Exception($"{value.Key} received unexpected type {value.Value.GetType()}"); diff --git a/src/Elastic.Transport/DistributedTransport.cs b/src/Elastic.Transport/DistributedTransport.cs index 7e98e11..3c58341 100644 --- a/src/Elastic.Transport/DistributedTransport.cs +++ b/src/Elastic.Transport/DistributedTransport.cs @@ -9,6 +9,7 @@ using System.Threading; using System.Threading.Tasks; using Elastic.Transport.Diagnostics; +using Elastic.Transport.Diagnostics.Auditing; using Elastic.Transport.Extensions; using Elastic.Transport.Products; @@ -22,27 +23,12 @@ namespace Elastic.Transport; public sealed class DistributedTransport : DistributedTransport { /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes + /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on + /// different nodes /// - /// The configuration to use for this transport - public DistributedTransport(ITransportConfiguration configurationValues) : base(configurationValues, null, null) { } - - /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes - /// - /// The configuration to use for this transport - /// In charge of create a new pipeline, safe to pass null to use the default - /// The date time proved to use, safe to pass null to use the default - internal DistributedTransport( - ITransportConfiguration configurationValues, - RequestPipelineFactory? pipelineProvider = null, - DateTimeProvider? dateTimeProvider = null - ) - : base(configurationValues, pipelineProvider, dateTimeProvider) { } + /// The configuration to use for this transport + public DistributedTransport(ITransportConfiguration configuration) + : base(configuration) { } } /// @@ -52,35 +38,26 @@ public class DistributedTransport : ITransport private readonly ProductRegistration _productRegistration; /// - /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on - /// different - /// nodes + /// Transport coordinates the client requests over the node pool nodes and is in charge of falling over on + /// different nodes /// - /// The configuration to use for this transport - /// In charge of create a new pipeline, safe to pass null to use the default - /// The date time proved to use, safe to pass null to use the default - public DistributedTransport( - TConfiguration configurationValues, - RequestPipelineFactory? pipelineProvider = null, - DateTimeProvider? dateTimeProvider = null - ) + /// The configuration to use for this transport + public DistributedTransport(TConfiguration configuration) { - configurationValues.ThrowIfNull(nameof(configurationValues)); - configurationValues.NodePool.ThrowIfNull(nameof(configurationValues.NodePool)); - configurationValues.RequestInvoker.ThrowIfNull(nameof(configurationValues.RequestInvoker)); - configurationValues.RequestResponseSerializer.ThrowIfNull(nameof(configurationValues - .RequestResponseSerializer)); - - _productRegistration = configurationValues.ProductRegistration; - - Configuration = configurationValues; + configuration.ThrowIfNull(nameof(configuration)); + configuration.NodePool.ThrowIfNull(nameof(configuration.NodePool)); + configuration.RequestInvoker.ThrowIfNull(nameof(configuration.RequestInvoker)); + configuration.RequestResponseSerializer.ThrowIfNull(nameof(configuration.RequestResponseSerializer)); + + _productRegistration = configuration.ProductRegistration; + Configuration = configuration; + MemoryStreamFactory = configuration.MemoryStreamFactory; TransportRequestData = new RequestData(Configuration); - RequestPipelineFactory = pipelineProvider ?? new DefaultRequestPipelineFactory(); - DateTimeProvider = dateTimeProvider ?? DefaultDateTimeProvider.Default; + TransportPipeline = Configuration.PipelineProvider.Create(TransportRequestData); } - private DateTimeProvider DateTimeProvider { get; } - private RequestPipelineFactory RequestPipelineFactory { get; } + private RequestPipeline TransportPipeline { get; } + private MemoryStreamFactory MemoryStreamFactory { get; } private RequestData TransportRequestData { get; } /// @@ -133,12 +110,14 @@ private async ValueTask RequestCoreAsync( Configuration.OnRequestDataCreated?.Invoke(requestData); - using var pipeline = RequestPipelineFactory.Create(requestData, DateTimeProvider); + var pipeline = requestData == TransportRequestData ? TransportPipeline : Configuration.PipelineProvider.Create(requestData); + var startedOn = Configuration.DateTimeProvider.Now(); + var auditor = Configuration.DisableAuditTrail.GetValueOrDefault(false) ? null : new Auditor(Configuration.DateTimeProvider); if (isAsync) - await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, cancellationToken).ConfigureAwait(false); + await pipeline.FirstPoolUsageAsync(Configuration.BootstrapLock, auditor, cancellationToken).ConfigureAwait(false); else - pipeline.FirstPoolUsage(Configuration.BootstrapLock); + pipeline.FirstPoolUsage(Configuration.BootstrapLock, auditor); TResponse response = null; @@ -179,10 +158,10 @@ private async ValueTask RequestCoreAsync( try { if (isAsync) - response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, cancellationToken) + response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, auditor, cancellationToken) .ConfigureAwait(false); else - response = pipeline.CallProductEndpoint(endpoint, requestData, data); + response = pipeline.CallProductEndpoint(endpoint, requestData, data, auditor); } catch (PipelineException pipelineException) when (!pipelineException.Recoverable) { @@ -194,12 +173,12 @@ private async ValueTask RequestCoreAsync( } catch (Exception killerException) { - ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, pipeline); + ThrowUnexpectedTransportException(killerException, seenExceptions, endpoint, response, auditor); } } else { - foreach (var node in pipeline.NextNode()) + foreach (var node in pipeline.NextNode(startedOn, auditor)) { attemptedNodes++; endpoint = endpoint with { Node = node }; @@ -215,23 +194,23 @@ private async ValueTask RequestCoreAsync( if (_productRegistration.SupportsSniff) { if (isAsync) - await pipeline.SniffOnStaleClusterAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnStaleClusterAsync(auditor, cancellationToken).ConfigureAwait(false); else - pipeline.SniffOnStaleCluster(); + pipeline.SniffOnStaleCluster(auditor); } if (_productRegistration.SupportsPing) { if (isAsync) - await PingAsync(pipeline, node, cancellationToken).ConfigureAwait(false); + await PingAsync(pipeline, node, auditor, cancellationToken).ConfigureAwait(false); else - Ping(pipeline, node); + Ping(pipeline, node, auditor); } if (isAsync) - response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, cancellationToken) + response = await pipeline.CallProductEndpointAsync(endpoint, requestData, data, auditor, cancellationToken) .ConfigureAwait(false); else - response = pipeline.CallProductEndpoint(endpoint, requestData, data); + response = pipeline.CallProductEndpoint(endpoint, requestData, data, auditor); if (!response.ApiCallDetails.SuccessOrKnownError) { @@ -240,9 +219,9 @@ private async ValueTask RequestCoreAsync( if (_productRegistration.SupportsSniff) { if (isAsync) - await pipeline.SniffOnConnectionFailureAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false); else - pipeline.SniffOnConnectionFailure(); + pipeline.SniffOnConnectionFailure(auditor); } } } @@ -258,19 +237,19 @@ private async ValueTask RequestCoreAsync( catch (Exception killerException) { if (killerException is OperationCanceledException && cancellationToken.IsCancellationRequested) - pipeline.AuditCancellationRequested(); + pipeline.AuditCancellationRequested(auditor); throw new UnexpectedTransportException(killerException, seenExceptions) { Endpoint = endpoint, ApiCallDetails = response?.ApiCallDetails, - AuditTrail = pipeline.AuditTrail + AuditTrail = auditor }; } if (cancellationToken.IsCancellationRequested) { - pipeline.AuditCancellationRequested(); + pipeline.AuditCancellationRequested(auditor); break; } @@ -288,7 +267,7 @@ private async ValueTask RequestCoreAsync( activity?.SetTag(SemanticConventions.HttpResponseStatusCode, response.ApiCallDetails.HttpStatusCode); activity?.SetTag(OpenTelemetryAttributes.ElasticTransportAttemptedNodes, attemptedNodes); - return FinalizeResponse(endpoint, requestData, data, pipeline, seenExceptions, response); + return FinalizeResponse(endpoint, requestData, data, pipeline, startedOn, auditor, seenExceptions, response); } finally { @@ -299,13 +278,13 @@ private async ValueTask RequestCoreAsync( private static void ThrowUnexpectedTransportException(Exception killerException, List seenExceptions, Endpoint endpoint, - TResponse response, RequestPipeline pipeline + TResponse response, IReadOnlyCollection? auditTrail ) where TResponse : TransportResponse, new() => throw new UnexpectedTransportException(killerException, seenExceptions) { Endpoint = endpoint, ApiCallDetails = response?.ApiCallDetails, - AuditTrail = pipeline.AuditTrail + AuditTrail = auditTrail }; private static void HandlePipelineException( @@ -320,19 +299,25 @@ ref List seenExceptions seenExceptions.Add(ex); } - private TResponse FinalizeResponse(Endpoint endpoint, RequestData requestData, PostData? postData, RequestPipeline pipeline, + private TResponse FinalizeResponse( + Endpoint endpoint, + RequestData requestData, + PostData? postData, + RequestPipeline pipeline, + DateTimeOffset startedOn, + Auditor auditor, List? seenExceptions, TResponse? response ) where TResponse : TransportResponse, new() { if (endpoint.IsEmpty) //foreach never ran - pipeline.ThrowNoNodesAttempted(endpoint, seenExceptions); + pipeline.ThrowNoNodesAttempted(endpoint, auditor, seenExceptions); var callDetails = GetMostRecentCallDetails(response, seenExceptions); - var clientException = pipeline.CreateClientException(response, callDetails, endpoint, requestData, seenExceptions); + var clientException = pipeline.CreateClientException(response, callDetails, endpoint, auditor, startedOn, seenExceptions); if (response?.ApiCallDetails == null) - pipeline.BadResponse(ref response, callDetails, endpoint, requestData, postData, clientException); + pipeline.BadResponse(ref response, callDetails, endpoint, requestData, postData, clientException, auditor); HandleTransportException(requestData, clientException, response); return response; @@ -359,8 +344,8 @@ private void HandleTransportException(RequestData data, Exception clientExceptio a.OriginalException = clientException; //On .NET Core the TransportClient implementation throws exceptions on bad responses //This causes it to behave differently to .NET FULL. We already wrapped the WebException - //under TransportException and it exposes way more information as part of it's - //exception message e.g the the root cause of the server error body. + //under TransportException, and it exposes way more information as part of its + //exception message e.g. the root cause of the server error body. #if NETFRAMEWORK if (a.OriginalException is WebException) a.OriginalException = clientException; @@ -371,30 +356,30 @@ private void HandleTransportException(RequestData data, Exception clientExceptio if (data != null && clientException != null && data.ThrowExceptions) throw clientException; } - private void Ping(RequestPipeline pipeline, Node node) + private void Ping(RequestPipeline pipeline, Node node, Auditor? auditor) { try { - pipeline.Ping(node); + pipeline.Ping(node, auditor); } catch (PipelineException e) when (e.Recoverable) { if (_productRegistration.SupportsSniff) - pipeline.SniffOnConnectionFailure(); + pipeline.SniffOnConnectionFailure(auditor); throw; } } - private async Task PingAsync(RequestPipeline pipeline, Node node, CancellationToken cancellationToken) + private async Task PingAsync(RequestPipeline pipeline, Node node, Auditor? auditor, CancellationToken cancellationToken) { try { - await pipeline.PingAsync(node, cancellationToken).ConfigureAwait(false); + await pipeline.PingAsync(node, auditor, cancellationToken).ConfigureAwait(false); } catch (PipelineException e) when (e.Recoverable) { if (_productRegistration.SupportsSniff) - await pipeline.SniffOnConnectionFailureAsync(cancellationToken).ConfigureAwait(false); + await pipeline.SniffOnConnectionFailureAsync(auditor, cancellationToken).ConfigureAwait(false); throw; } } diff --git a/src/Elastic.Transport/Exceptions/TransportException.cs b/src/Elastic.Transport/Exceptions/TransportException.cs index fe94bb0..602626f 100644 --- a/src/Elastic.Transport/Exceptions/TransportException.cs +++ b/src/Elastic.Transport/Exceptions/TransportException.cs @@ -25,7 +25,7 @@ public class TransportException : Exception public TransportException(string message) : base(message) => FailureReason = PipelineFailure.Unexpected; /// - public TransportException(PipelineFailure failure, string message, Exception innerException) + public TransportException(PipelineFailure failure, string message, Exception? innerException = null) : base(message, innerException) => FailureReason = failure; /// @@ -41,7 +41,7 @@ public TransportException(PipelineFailure failure, string message, TransportResp /// The audit trail keeping track of what happened during the invocation of /// a request, up until the moment of this exception. /// - public IEnumerable AuditTrail { get; internal set; } + public IReadOnlyCollection? AuditTrail { get; internal init; } /// /// The reason this exception occurred was one of the well defined exit points as modelled by diff --git a/src/Elastic.Transport/Extensions/EmptyEnumerator.cs b/src/Elastic.Transport/Extensions/EmptyEnumerator.cs new file mode 100644 index 0000000..cac38ec --- /dev/null +++ b/src/Elastic.Transport/Extensions/EmptyEnumerator.cs @@ -0,0 +1,23 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System.Collections; +using System.Collections.Generic; + +namespace Elastic.Transport.Extensions; + +internal struct EmptyEnumerator : IEnumerator +{ + public T Current => default; + object IEnumerator.Current => Current!; + public bool MoveNext() => false; + + public void Reset() + { + } + + public void Dispose() + { + } +} diff --git a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs index 38874c2..2acd303 100644 --- a/src/Elastic.Transport/Responses/DefaultResponseFactory.cs +++ b/src/Elastic.Transport/Responses/DefaultResponseFactory.cs @@ -20,7 +20,7 @@ namespace Elastic.Transport; /// /// Create an instance of the factory using the provided configuration. /// -internal sealed class DefaultResponseFactory(ITransportConfiguration transportConfiguration) : ResponseFactory +internal sealed class DefaultResponseFactory : ResponseFactory { private readonly ConcurrentDictionary _resolvedBuilders = new() { @@ -31,8 +31,6 @@ internal sealed class DefaultResponseFactory(ITransportConfiguration transportCo [typeof(VoidResponse)] = new VoidResponseBuilder() }; - private readonly ITransportConfiguration? _transportConfiguration = transportConfiguration; - /// public override TResponse Create( Endpoint endpoint, @@ -87,7 +85,8 @@ private async ValueTask CreateCoreAsync( TResponse? response = null; - if (MayHaveBody(statusCode, endpoint.Method, contentLength) && TryResolveBuilder(out var builder)) + if (MayHaveBody(statusCode, endpoint.Method, contentLength) + && TryResolveBuilder(requestData.ProductResponseBuilders, requestData.ResponseBuilders, out var builder)) { var ownsStream = false; @@ -122,17 +121,19 @@ private async ValueTask CreateCoreAsync( response ??= new TResponse(); response.ApiCallDetails = details; return response; - } + } - private bool TryResolveBuilder(out IResponseBuilder builder) where TResponse : TransportResponse, new() + private bool TryResolveBuilder(IReadOnlyCollection productResponseBuilders, + IReadOnlyCollection responseBuilders, out IResponseBuilder builder + ) where TResponse : TransportResponse, new() { if (_resolvedBuilders.TryGetValue(typeof(TResponse), out builder)) return true; - if (TryFindResponseBuilder(_transportConfiguration.ResponseBuilders, _resolvedBuilders, ref builder)) + if (TryFindResponseBuilder(responseBuilders, _resolvedBuilders, ref builder)) return true; - return TryFindResponseBuilder(_transportConfiguration.ProductRegistration.ResponseBuilders, _resolvedBuilders, ref builder); + return TryFindResponseBuilder(productResponseBuilders, _resolvedBuilders, ref builder); static bool TryFindResponseBuilder(IEnumerable responseBuilders, ConcurrentDictionary resolvedBuilders, ref IResponseBuilder builder) { diff --git a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs index 679463d..ceb1717 100644 --- a/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs +++ b/src/Elastic.Transport/Responses/HttpDetails/ApiCallDetails.cs @@ -25,17 +25,17 @@ internal ApiCallDetails() { } /// /// Access to the collection of events that occurred during the request. /// > - public IEnumerable AuditTrail { get; internal set; } + public IReadOnlyCollection? AuditTrail { get; internal set; } /// /// Statistics about the worker and I/O completion port threads at the time of the request. /// - internal IReadOnlyDictionary ThreadPoolStats { get; set; } + internal IReadOnlyDictionary? ThreadPoolStats { get; init; } /// /// Statistics about the number of ports in various TCP states at the time of the request. /// - internal IReadOnlyDictionary TcpStats { get; set; } + internal IReadOnlyDictionary? TcpStats { get; init; } /// /// Information used to debug the request. diff --git a/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs b/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs index e6ce796..369f7a5 100644 --- a/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs +++ b/tests/Elastic.Transport.Tests/Configuration/TransportConfigurationTests.cs @@ -32,12 +32,9 @@ public void SameDefaults() config.Should().BeEquivalentTo(newConfig, c => c .Excluding(p=>p.BootstrapLock) - .Excluding(p=>p.NodePool.LastUpdate) ); config.BootstrapLock.CurrentCount.Should().Be(newConfig.BootstrapLock.CurrentCount); - config.NodePool.LastUpdate - .Should().BeCloseTo(newConfig.NodePool.LastUpdate, TimeSpan.FromSeconds(2)); } #if !NETFRAMEWORK diff --git a/tests/Elastic.Transport.Tests/Plumbing/InMemoryConnectionFactory.cs b/tests/Elastic.Transport.Tests/Plumbing/InMemoryConnectionFactory.cs index 6f1222b..f4eb21d 100644 --- a/tests/Elastic.Transport.Tests/Plumbing/InMemoryConnectionFactory.cs +++ b/tests/Elastic.Transport.Tests/Plumbing/InMemoryConnectionFactory.cs @@ -11,7 +11,7 @@ public static class InMemoryConnectionFactory { public static TransportConfiguration Create(ProductRegistration productRegistration = null) { - var invoker = new InMemoryRequestInvoker(productRegistration); + var invoker = new InMemoryRequestInvoker(); var pool = new SingleNodePool(new Uri("http://localhost:9200")); var settings = new TransportConfiguration(pool, invoker, productRegistration: productRegistration); return settings;