diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java b/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java index 205c1b556a..9b9fc5c454 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import sun.misc.Unsafe; @@ -109,6 +110,8 @@ public class CalculateAverage_royvanrijn { private static final int TABLE_SIZE = 1 << 19; // large enough for the contest. private static final int TABLE_MASK = (TABLE_SIZE - 1); + private static final long SEGMENT_SIZE = 1 << 21; + // Idea of thomaswue, don't wait for slow unmap: private static void spawnWorker() throws IOException { ProcessHandle.Info info = ProcessHandle.current().info(); @@ -130,7 +133,9 @@ public static void main(String[] args) throws Exception { // Calculate input segments. final FileChannel fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ); final long fileSize = fileChannel.size(); - final long segmentSize = (fileSize + PROCESSORS - 1) / PROCESSORS; + + final AtomicLong segmentCounter = new AtomicLong(0); + final long mapAddress = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()).address(); final Thread[] parallelThreads = new Thread[PROCESSORS - 1]; @@ -139,16 +144,12 @@ public static void main(String[] args) throws Exception { final ConcurrentHashMap measurements = new ConcurrentHashMap(1 << 10); // We create separate threads for twice the amount of processors. - long lastAddress = mapAddress; final long endOfFile = mapAddress + fileSize; for (int i = 0; i < PROCESSORS - 1; ++i) { - final long fromAddress = lastAddress; - final long toAddress = Math.min(endOfFile, fromAddress + segmentSize); - final Thread thread = new Thread(() -> { // The actual work is done here: - final byte[][] table = processMemoryArea(fromAddress, toAddress, fromAddress == mapAddress); + final byte[][] table = processMemoryArea(segmentCounter, mapAddress, endOfFile); for (byte[] entry : table) { if (entry != null) { @@ -158,11 +159,10 @@ public static void main(String[] args) throws Exception { }); thread.start(); // start a.s.a.p. parallelThreads[i] = thread; - lastAddress = toAddress; } // Use the current thread for the part of memory: - final byte[][] table = processMemoryArea(lastAddress, mapAddress + fileSize, false); + final byte[][] table = processMemoryArea(segmentCounter, mapAddress, endOfFile); for (byte[] entry : table) { if (entry != null) { @@ -433,76 +433,84 @@ private boolean matches16(final byte[] entry) { } } - private static byte[][] processMemoryArea(final long startAddress, final long endAddress, boolean isFileStart) { + private static byte[][] processMemoryArea(final AtomicLong segmentCounter, final long startAddress, final long endAddress) { final byte[][] table = new byte[TABLE_SIZE][]; final byte[][] preConstructedEntries = new byte[PREMADE_ENTRIES][ENTRY_BASESIZE_WHITESPACE + PREMADE_MAX_SIZE]; - final Reader reader = new Reader(startAddress, endAddress, isFileStart); - - byte[] entry; - int entryCount = 0; - - // Find the correct starting position - while (reader.hasNext()) { + while (true) { - reader.processStart(); + long fromAddress = startAddress + segmentCounter.getAndIncrement() * SEGMENT_SIZE; + long toAddress = Math.min(endAddress, fromAddress + SEGMENT_SIZE); - if (reader.readNextFoundDelimiter()) { - // First 16 bytes: - int temperature = reader.readTemperature(); - - // Find or insert the entry: - int index = (int) (reader.hash & TABLE_MASK); - while (true) { - entry = table[index]; - if (entry == null) { - byte[] entryBytes = (entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++] - : new byte[ENTRY_BASESIZE_WHITESPACE + 16]; // with enough room - table[index] = fillEntry16(entryBytes, 16, temperature, reader.readBuffer1, reader.readBuffer2); - break; - } - else if (reader.matches16(entry)) { - updateEntry(entry, temperature); - break; - } - else { - // Move to the next index - index = (index + 1) & TABLE_MASK; - } - } + if (fromAddress >= endAddress) { + return table; } - else { - while (!reader.readNextFoundDelimiter()) { - // nop - } - - int temperature = reader.readTemperature(); - - // Find or insert the entry: - int index = (int) (reader.hash & TABLE_MASK); - while (true) { - entry = table[index]; - if (entry == null) { - int length = reader.entryLength; - byte[] entryBytes = (length < PREMADE_MAX_SIZE && entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++] - : new byte[ENTRY_BASESIZE_WHITESPACE + length]; // with enough room - table[index] = fillEntry(entryBytes, reader.entryStart, length, temperature, reader.readBuffer1, reader.readBuffer2); - break; + final Reader reader = new Reader(fromAddress, toAddress, fromAddress == startAddress); + + byte[] entry; + int entryCount = 0; + + // Find the correct starting position + while (reader.hasNext()) { + + reader.processStart(); + + if (reader.readNextFoundDelimiter()) { + // First 16 bytes: + int temperature = reader.readTemperature(); + + // Find or insert the entry: + int index = (int) (reader.hash & TABLE_MASK); + while (true) { + entry = table[index]; + if (entry == null) { + byte[] entryBytes = (entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++] + : new byte[ENTRY_BASESIZE_WHITESPACE + 16]; // with enough room + table[index] = fillEntry16(entryBytes, 16, temperature, reader.readBuffer1, reader.readBuffer2); + break; + } + else if (reader.matches16(entry)) { + updateEntry(entry, temperature); + break; + } + else { + // Move to the next index + index = (index + 1) & TABLE_MASK; + } } - else if (reader.matches(entry)) { - updateEntry(entry, temperature); - break; + } + else { + while (!reader.readNextFoundDelimiter()) { + // nop } - else { - // Move to the next index - index = (index + 1) & TABLE_MASK; + + int temperature = reader.readTemperature(); + + // Find or insert the entry: + int index = (int) (reader.hash & TABLE_MASK); + while (true) { + entry = table[index]; + if (entry == null) { + int length = reader.entryLength; + byte[] entryBytes = (length < PREMADE_MAX_SIZE && entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++] + : new byte[ENTRY_BASESIZE_WHITESPACE + length]; // with enough room + table[index] = fillEntry(entryBytes, reader.entryStart, length, temperature, reader.readBuffer1, reader.readBuffer2); + break; + } + else if (reader.matches(entry)) { + updateEntry(entry, temperature); + break; + } + else { + // Move to the next index + index = (index + 1) & TABLE_MASK; + } } } - } + } } - return table; } private static boolean compare(final Object object1, final long address1, final Object object2, final long address2) {