Skip to content

Commit

Permalink
Processing smaller segments
Browse files Browse the repository at this point in the history
  • Loading branch information
royvanrijn committed Jan 30, 2024
1 parent 2dcf876 commit da4b38d
Showing 1 changed file with 73 additions and 65 deletions.
138 changes: 73 additions & 65 deletions src/main/java/dev/morling/onebrc/CalculateAverage_royvanrijn.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import sun.misc.Unsafe;
Expand Down Expand Up @@ -109,6 +110,8 @@ public class CalculateAverage_royvanrijn {
private static final int TABLE_SIZE = 1 << 19; // large enough for the contest.
private static final int TABLE_MASK = (TABLE_SIZE - 1);

private static final long SEGMENT_SIZE = 1 << 21;

// Idea of thomaswue, don't wait for slow unmap:
private static void spawnWorker() throws IOException {
ProcessHandle.Info info = ProcessHandle.current().info();
Expand All @@ -130,7 +133,9 @@ public static void main(String[] args) throws Exception {
// Calculate input segments.
final FileChannel fileChannel = FileChannel.open(Path.of(FILE), StandardOpenOption.READ);
final long fileSize = fileChannel.size();
final long segmentSize = (fileSize + PROCESSORS - 1) / PROCESSORS;

final AtomicLong segmentCounter = new AtomicLong(0);

final long mapAddress = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize, Arena.global()).address();

final Thread[] parallelThreads = new Thread[PROCESSORS - 1];
Expand All @@ -139,16 +144,12 @@ public static void main(String[] args) throws Exception {
final ConcurrentHashMap<String, byte[]> measurements = new ConcurrentHashMap(1 << 10);

// We create separate threads for twice the amount of processors.
long lastAddress = mapAddress;
final long endOfFile = mapAddress + fileSize;
for (int i = 0; i < PROCESSORS - 1; ++i) {

final long fromAddress = lastAddress;
final long toAddress = Math.min(endOfFile, fromAddress + segmentSize);

final Thread thread = new Thread(() -> {
// The actual work is done here:
final byte[][] table = processMemoryArea(fromAddress, toAddress, fromAddress == mapAddress);
final byte[][] table = processMemoryArea(segmentCounter, mapAddress, endOfFile);

for (byte[] entry : table) {
if (entry != null) {
Expand All @@ -158,11 +159,10 @@ public static void main(String[] args) throws Exception {
});
thread.start(); // start a.s.a.p.
parallelThreads[i] = thread;
lastAddress = toAddress;
}

// Use the current thread for the part of memory:
final byte[][] table = processMemoryArea(lastAddress, mapAddress + fileSize, false);
final byte[][] table = processMemoryArea(segmentCounter, mapAddress, endOfFile);

for (byte[] entry : table) {
if (entry != null) {
Expand Down Expand Up @@ -433,76 +433,84 @@ private boolean matches16(final byte[] entry) {
}
}

private static byte[][] processMemoryArea(final long startAddress, final long endAddress, boolean isFileStart) {
private static byte[][] processMemoryArea(final AtomicLong segmentCounter, final long startAddress, final long endAddress) {

final byte[][] table = new byte[TABLE_SIZE][];
final byte[][] preConstructedEntries = new byte[PREMADE_ENTRIES][ENTRY_BASESIZE_WHITESPACE + PREMADE_MAX_SIZE];

final Reader reader = new Reader(startAddress, endAddress, isFileStart);

byte[] entry;
int entryCount = 0;

// Find the correct starting position
while (reader.hasNext()) {
while (true) {

reader.processStart();
long fromAddress = startAddress + segmentCounter.getAndIncrement() * SEGMENT_SIZE;
long toAddress = Math.min(endAddress, fromAddress + SEGMENT_SIZE);

if (reader.readNextFoundDelimiter()) {
// First 16 bytes:
int temperature = reader.readTemperature();

// Find or insert the entry:
int index = (int) (reader.hash & TABLE_MASK);
while (true) {
entry = table[index];
if (entry == null) {
byte[] entryBytes = (entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++]
: new byte[ENTRY_BASESIZE_WHITESPACE + 16]; // with enough room
table[index] = fillEntry16(entryBytes, 16, temperature, reader.readBuffer1, reader.readBuffer2);
break;
}
else if (reader.matches16(entry)) {
updateEntry(entry, temperature);
break;
}
else {
// Move to the next index
index = (index + 1) & TABLE_MASK;
}
}
if (fromAddress >= endAddress) {
return table;
}
else {
while (!reader.readNextFoundDelimiter()) {
// nop
}

int temperature = reader.readTemperature();

// Find or insert the entry:
int index = (int) (reader.hash & TABLE_MASK);
while (true) {
entry = table[index];
if (entry == null) {
int length = reader.entryLength;
byte[] entryBytes = (length < PREMADE_MAX_SIZE && entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++]
: new byte[ENTRY_BASESIZE_WHITESPACE + length]; // with enough room
table[index] = fillEntry(entryBytes, reader.entryStart, length, temperature, reader.readBuffer1, reader.readBuffer2);
break;
final Reader reader = new Reader(fromAddress, toAddress, fromAddress == startAddress);

byte[] entry;
int entryCount = 0;

// Find the correct starting position
while (reader.hasNext()) {

reader.processStart();

if (reader.readNextFoundDelimiter()) {
// First 16 bytes:
int temperature = reader.readTemperature();

// Find or insert the entry:
int index = (int) (reader.hash & TABLE_MASK);
while (true) {
entry = table[index];
if (entry == null) {
byte[] entryBytes = (entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++]
: new byte[ENTRY_BASESIZE_WHITESPACE + 16]; // with enough room
table[index] = fillEntry16(entryBytes, 16, temperature, reader.readBuffer1, reader.readBuffer2);
break;
}
else if (reader.matches16(entry)) {
updateEntry(entry, temperature);
break;
}
else {
// Move to the next index
index = (index + 1) & TABLE_MASK;
}
}
else if (reader.matches(entry)) {
updateEntry(entry, temperature);
break;
}
else {
while (!reader.readNextFoundDelimiter()) {
// nop
}
else {
// Move to the next index
index = (index + 1) & TABLE_MASK;

int temperature = reader.readTemperature();

// Find or insert the entry:
int index = (int) (reader.hash & TABLE_MASK);
while (true) {
entry = table[index];
if (entry == null) {
int length = reader.entryLength;
byte[] entryBytes = (length < PREMADE_MAX_SIZE && entryCount < PREMADE_ENTRIES) ? preConstructedEntries[entryCount++]
: new byte[ENTRY_BASESIZE_WHITESPACE + length]; // with enough room
table[index] = fillEntry(entryBytes, reader.entryStart, length, temperature, reader.readBuffer1, reader.readBuffer2);
break;
}
else if (reader.matches(entry)) {
updateEntry(entry, temperature);
break;
}
else {
// Move to the next index
index = (index + 1) & TABLE_MASK;
}
}
}
}

}
}
return table;
}

private static boolean compare(final Object object1, final long address1, final Object object2, final long address2) {
Expand Down

0 comments on commit da4b38d

Please sign in to comment.