Skip to content

Commit

Permalink
preliminary work for single roundtrip per data node
Browse files Browse the repository at this point in the history
  • Loading branch information
piergm committed Nov 14, 2024
1 parent 5fe3e46 commit 6cfae86
Showing 1 changed file with 25 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -250,6 +250,12 @@ public final void run() {
performPhaseOnShard(shardIndex, shardRoutings, routing);
}
}
for (PendingExecutions pendingExecutions : pendingExecutionsPerNode.values()) {
if (pendingExecutions.queuedItems.get() < maxConcurrentRequestsPerNode) {
pendingExecutions.semaphore.release(maxConcurrentRequestsPerNode);
pendingExecutions.flushQueue();
}
}
}
}

Expand Down Expand Up @@ -800,23 +806,33 @@ protected final ShardSearchRequest buildShardSearchRequest(SearchShardIterator s

private static final class PendingExecutions {
private final Semaphore semaphore;
private final ConcurrentLinkedQueue<Consumer<Releasable>> queue = new ConcurrentLinkedQueue<>();
private final AtomicInteger queuedItems;
private final int permits;
private final LinkedTransferQueue<Consumer<Releasable>> queue = new LinkedTransferQueue<>();

PendingExecutions(int permits) {
assert permits > 0 : "not enough permits: " + permits;
semaphore = new Semaphore(permits);
semaphore = new Semaphore(0);
this.permits = permits;
queuedItems = new AtomicInteger();
}

void submit(Consumer<Releasable> task) {
if (semaphore.tryAcquire()) {
executeAndRelease(task);
} else {
queue.add(task);
if (semaphore.tryAcquire()) {
task = pollNextTaskOrReleasePermit();
if (task != null) {
executeAndRelease(task);
}
if (queuedItems.incrementAndGet() == permits) {
semaphore.release(permits);
}
flushQueue();
}
}

void flushQueue() {
if (semaphore.tryAcquire()) {
var task = pollNextTaskOrReleasePermit();
if (task != null) {
executeAndRelease(task);
}
}
}
Expand Down

0 comments on commit 6cfae86

Please sign in to comment.