Skip to content

Commit

Permalink
Iterate the buffer once and use BigDecimal parsing instead of Double.…
Browse files Browse the repository at this point in the history
…parseDouble
  • Loading branch information
filiphr committed Jan 3, 2024
1 parent a4344e6 commit ce2d9d6
Showing 1 changed file with 96 additions and 52 deletions.
148 changes: 96 additions & 52 deletions src/main/java/dev/morling/onebrc/CalculateAverage_filiphr.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,29 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Initial submission: 1m 35s
* Adding memory mapped files: 0m 55s (based on bjhara's submission)
*
* Initial submission: 1m 35s
* Adding memory mapped files: 0m 55s (based on bjhara's submission)
* Using big decimal and iterating the buffer once: 0m 20s
* <p>
* Using 21.0.1 Temurin with ShenandoahGC on Macbook (Intel) Pro
* `sdk use java 21.0.1-tem`
Expand All @@ -51,22 +53,30 @@ public class CalculateAverage_filiphr {

private static final class Measurement {

private final AtomicReference<Double> min = new AtomicReference<>(Double.POSITIVE_INFINITY);
private final AtomicReference<Double> max = new AtomicReference<>(Double.NEGATIVE_INFINITY);
private final AtomicReference<Double> sum = new AtomicReference<>(0d);
private final AtomicLong count = new AtomicLong(0);

private Measurement combine(double value) {
this.min.accumulateAndGet(value, Math::min);
this.max.accumulateAndGet(value, Math::max);
this.sum.accumulateAndGet(value, Double::sum);
this.count.incrementAndGet();
return this;
private double min = Long.MAX_VALUE;
private double max = Long.MIN_VALUE;
private double sum = 0L;
private long count = 0L;

private void add(double value) {
this.min = Math.min(this.min, value);
this.max = Math.max(this.max, value);
this.sum += value;
this.count++;
}

public static Measurement combine(Measurement m1, Measurement m2) {
Measurement measurement = new Measurement();
measurement.min = Math.min(m1.min, m2.min);
measurement.max = Math.max(m1.max, m2.max);
measurement.sum = m1.sum + m2.sum;
measurement.count = m1.count + m2.count;
return measurement;
}

@Override
public String toString() {
return round(min.get()) + "/" + round(sum.get() / count.get()) + "/" + round(max.get());
return round(min) + "/" + round((sum) / count) + "/" + round(max);
}

private double round(double value) {
Expand All @@ -77,63 +87,97 @@ private double round(double value) {
public static void main(String[] args) throws IOException {
// long start = System.nanoTime();

Map<String, Measurement> measurements = new ConcurrentHashMap<>(700);
Map<String, Measurement> measurements;
try (FileChannel fileChannel = FileChannel.open(Paths.get(FILE), StandardOpenOption.READ)) {
fineChannelStream(fileChannel)
measurements = fineChannelStream(fileChannel)
.parallel()
.forEach(byteBuffer -> {
processBuffer(byteBuffer, (city, temperature) -> {
measurements.computeIfAbsent(city, k -> new Measurement())
.combine(temperature);
});
});
.map(CalculateAverage_filiphr::parseBuffer)
.reduce(Collections.emptyMap(), CalculateAverage_filiphr::mergeMaps);
}

System.out.println(new TreeMap<>(measurements));
// System.out.println("Done in " + (System.nanoTime() - start) / 1000000 + " ms");
}

private static Map<String, Measurement> mergeMaps(Map<String, Measurement> map1, Map<String, Measurement> map2) {
if (map1.isEmpty()) {
return map2;
} else {
Set<String> cities = new HashSet<>(map1.keySet());
cities.addAll(map2.keySet());
Map<String, Measurement> result = HashMap.newHashMap(cities.size());

for (String city : cities) {
Measurement m1 = map1.get(city);
Measurement m2 = map2.get(city);
if (m2 == null) {
// When m2 is null then it is not possible for m1 to be null as well,
// since cities is a union of the map key sets
result.put(city, m1);
} else if (m1 == null) {
// When m1 is null then it is not possible for m2 to be null as well,
// since cities is a union of the map key sets
result.put(city, m2);
} else {
result.put(city, Measurement.combine(m1, m2));
}
}

return result;
}
}

/**
* This is an adapted implementation of the bjhara parseBuffer
*/
private static void processBuffer(ByteBuffer bb, BiConsumer<String, Double> entryConsumer) {
private static Map<String, Measurement> parseBuffer(ByteBuffer bb) {
Map<String, Measurement> measurements = HashMap.newHashMap(415);
int limit = bb.limit();
byte[] buffer = new byte[128];
CharBuffer charBuffer = CharBuffer.allocate(8);

while (bb.position() < limit) {
int currentPosition = bb.position();

// find the ; separator
int separator = currentPosition;
while (separator != limit && bb.get(separator) != ';')
separator++;
int bufferIndex = 0;

// find the end of the line
int end = separator + 1;
while (end != limit && !Character.isWhitespace((char) bb.get(end)))
end++;
// Iterate through the byte buffer and fill the buffer until we find the separator (;)
while (bb.position() < limit) {
byte positionByte = bb.get();
if (positionByte == ';') {
break;
}
buffer[bufferIndex++] = positionByte;
}

// get the name as a string
int nameLength = separator - currentPosition;
bb.get(buffer, 0, nameLength);
String city = new String(buffer, 0, nameLength);
// Create the city
String city = new String(buffer, 0, bufferIndex);

// get rid of the separator
bb.get();
charBuffer.clear();
byte lastPositionByte = '\n';
while (bb.position() < limit) {
byte positionByte = bb.get();
if (positionByte == '\r' || positionByte == '\n') {
lastPositionByte = positionByte;
break;
}
charBuffer.append((char) positionByte);
}

// get the double value
int valueLength = end - separator - 1;
bb.get(buffer, 0, valueLength);
String valueStr = new String(buffer, 0, valueLength);
double value = Double.parseDouble(valueStr);
int position = charBuffer.position();
charBuffer.position(0);
// Create the temperature string
BigDecimal bigDecimal = new BigDecimal(charBuffer.array(), 0, position);
double value = bigDecimal.doubleValue();

entryConsumer.accept(city, value);
measurements.computeIfAbsent(city, k -> new Measurement())
.add(value);

// and get rid of the new line (handle both kinds)
byte newline = bb.get();
if (newline == '\r')
if (lastPositionByte == '\r') {
bb.get();
}
}

return measurements;
}

/**
Expand Down

0 comments on commit ce2d9d6

Please sign in to comment.