Skip to content

Commit

Permalink
serkan-ozal's final submission:
Browse files Browse the repository at this point in the history
- start worker thread in advance
- increase region count
  • Loading branch information
serkan-ozal committed Jan 31, 2024
1 parent 7dcf707 commit e5c5aa0
Showing 1 changed file with 29 additions and 11 deletions.
40 changes: 29 additions & 11 deletions src/main/java/dev/morling/onebrc/CalculateAverage_serkan_ozal.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class CalculateAverage_serkan_ozal {
private static final int THREAD_COUNT = Runtime.getRuntime().availableProcessors(); // getIntegerConfig("THREAD_COUNT", Runtime.getRuntime().availableProcessors());
private static final boolean USE_VTHREADS = false; // getBooleanConfig("USE_VTHREADS", false);
private static final int VTHREAD_COUNT = 1024; // getIntegerConfig("VTHREAD_COUNT", 1024);
private static final int REGION_COUNT = 256; // getIntegerConfig("REGION_COUNT", -1);
private static final int REGION_COUNT = 512; // getIntegerConfig("REGION_COUNT", -1);
private static final boolean USE_SHARED_ARENA = true; // getBooleanConfig("USE_SHARED_ARENA", true);
private static final boolean USE_SHARED_REGION = true; // getBooleanConfig("USE_SHARED_REGION", true);
private static final int MAP_CAPACITY = 1 << 17; // getIntegerConfig("MAP_CAPACITY", 1 << 17);
Expand Down Expand Up @@ -130,8 +131,18 @@ public static void main(String[] args) throws Exception {
}

List<Task> tasks = new ArrayList<>(regionCount);
// Split whole file into regions and create tasks for each region
Queue<Task> sharedTasks = new ConcurrentLinkedQueue<>();
Request request = new Request(arena, sharedTasks, result);
List<Future<Response>> futures = new ArrayList<>(regionCount);

// Start region processors earlier to process tasks for each region
for (int i = 0; i < concurrency; i++) {
RegionProcessor regionProcessor = createRegionProcessor(request);
Future<Response> future = executor.submit(regionProcessor);
futures.add(future);
}

// Split whole file into regions and create tasks for each region
for (int i = 0; i < regionCount; i++) {
long endPos = Math.min(fileSize, startPos + regionSize);
// Lines might split into different regions.
Expand All @@ -144,15 +155,9 @@ public static void main(String[] args) throws Exception {
startPos = closestLineEndPos;
}

Queue<Task> sharedTasks = new ConcurrentLinkedQueue<>(tasks);

// Start region processors to process tasks for each region
for (int i = 0; i < concurrency; i++) {
Request request = new Request(arena, sharedTasks, result);
RegionProcessor regionProcessor = createRegionProcessor(request);
Future<Response> future = executor.submit(regionProcessor);
futures.add(future);
}
// Submit all tasks
sharedTasks.addAll(tasks);
request.start();

// Wait processors to complete
for (Future<Response> future : futures) {
Expand Down Expand Up @@ -247,12 +252,14 @@ public Thread newThread(Runnable r) {
*/
private static class RegionProcessor implements Callable<Response> {

private final Request request;
private final Arena arena;
private final Queue<Task> sharedTasks;
private final Result result;
private OpenMap map;

private RegionProcessor(Request request) {
this.request = request;
this.arena = request.arena;
this.sharedTasks = request.sharedTasks;
this.result = request.result;
Expand Down Expand Up @@ -282,6 +289,8 @@ private void processRegion() throws Exception {
// If no shared global memory arena is used, create and use its own local memory arena
Arena a = arenaGiven ? arena : Arena.ofConfined();
try {
request.waitForStart();

for (Task task = sharedTasks.poll(); task != null; task = sharedTasks.poll()) {
boolean regionGiven = task.region != null;
MemorySegment r = regionGiven
Expand Down Expand Up @@ -562,13 +571,22 @@ private static final class Request {
private final Arena arena;
private final Queue<Task> sharedTasks;
private final Result result;
private final CountDownLatch started = new CountDownLatch(1);

private Request(Arena arena, Queue<Task> sharedTasks, Result result) {
this.arena = arena;
this.sharedTasks = sharedTasks;
this.result = result;
}

private void waitForStart() throws InterruptedException {
started.await();
}

private void start() {
started.countDown();
}

}

/**
Expand Down

0 comments on commit e5c5aa0

Please sign in to comment.