From 97a2fcd9f6364b89cd1802907a52b0d0b08f1374 Mon Sep 17 00:00:00 2001 From: Matteo Piergiovanni <134913285+piergm@users.noreply.github.com> Date: Thu, 14 Nov 2024 15:40:05 +0100 Subject: [PATCH] fixed concurrency --- .../action/search/AbstractSearchAsyncAction.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 2a8f5d130c447..82fd5a9083d73 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -807,13 +807,15 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s private static final class PendingExecutions { private final Semaphore semaphore; private final AtomicInteger queuedItems; + private final AtomicBoolean released; private final int permits; private final LinkedTransferQueue> queue = new LinkedTransferQueue<>(); PendingExecutions(int permits) { - semaphore = new Semaphore(0); this.permits = permits; + semaphore = new Semaphore(0); queuedItems = new AtomicInteger(); + released = new AtomicBoolean(false); } void submit(Consumer task) { @@ -821,8 +823,9 @@ void submit(Consumer task) { executeAndRelease(task); } else { queue.add(task); - if (queuedItems.incrementAndGet() == permits) { + if (queuedItems.incrementAndGet() >= permits && released.get() == false) { semaphore.release(permits); + released.set(true); } flushQueue(); }