Skip to content

Commit

Permalink
fixed concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
piergm committed Nov 14, 2024
1 parent 6cfae86 commit 97a2fcd
Showing 1 changed file with 5 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -807,22 +807,25 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s
private static final class PendingExecutions {
private final Semaphore semaphore;
private final AtomicInteger queuedItems;
private final AtomicBoolean released;
private final int permits;
private final LinkedTransferQueue<Consumer<Releasable>> queue = new LinkedTransferQueue<>();

PendingExecutions(int permits) {
semaphore = new Semaphore(0);
this.permits = permits;
semaphore = new Semaphore(0);
queuedItems = new AtomicInteger();
released = new AtomicBoolean(false);
}

void submit(Consumer<Releasable> task) {
if (semaphore.tryAcquire()) {
executeAndRelease(task);
} else {
queue.add(task);
if (queuedItems.incrementAndGet() == permits) {
if (queuedItems.incrementAndGet() >= permits && released.get() == false) {
semaphore.release(permits);
released.set(true);
}
flushQueue();
}
Expand Down

0 comments on commit 97a2fcd

Please sign in to comment.