From 6cfae86402448bcf2be5f93b1f4872c0457a39e4 Mon Sep 17 00:00:00 2001 From: Matteo Piergiovanni <134913285+piergm@users.noreply.github.com> Date: Thu, 14 Nov 2024 14:50:50 +0100 Subject: [PATCH] preliminary work for single roundtrip per data node --- .../search/AbstractSearchAsyncAction.java | 34 ++++++++++++++----- 1 file changed, 25 insertions(+), 9 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c051f0ca7a6f5..2a8f5d130c447 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -48,8 +48,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.LinkedTransferQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -250,6 +250,12 @@ public final void run() { performPhaseOnShard(shardIndex, shardRoutings, routing); } } + for (PendingExecutions pendingExecutions : pendingExecutionsPerNode.values()) { + if (pendingExecutions.queuedItems.get() < maxConcurrentRequestsPerNode) { + pendingExecutions.semaphore.release(maxConcurrentRequestsPerNode); + pendingExecutions.flushQueue(); + } + } } } @@ -800,11 +806,14 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s private static final class PendingExecutions { private final Semaphore semaphore; - private final ConcurrentLinkedQueue> queue = new ConcurrentLinkedQueue<>(); + private final AtomicInteger queuedItems; + private final int permits; + private final LinkedTransferQueue> queue = new LinkedTransferQueue<>(); PendingExecutions(int permits) { - assert permits > 0 : "not enough permits: " + permits; - semaphore = new Semaphore(permits); + semaphore = new Semaphore(0); + this.permits = permits; + queuedItems = new AtomicInteger(); } void submit(Consumer task) { @@ -812,11 +821,18 @@ void submit(Consumer task) { executeAndRelease(task); } else { queue.add(task); - if (semaphore.tryAcquire()) { - task = pollNextTaskOrReleasePermit(); - if (task != null) { - executeAndRelease(task); - } + if (queuedItems.incrementAndGet() == permits) { + semaphore.release(permits); + } + flushQueue(); + } + } + + void flushQueue() { + if (semaphore.tryAcquire()) { + var task = pollNextTaskOrReleasePermit(); + if (task != null) { + executeAndRelease(task); } } }