diff --git a/calculate_average_gnabyl.sh b/calculate_average_gnabyl.sh index 14c449a4f..9baa93d07 100755 --- a/calculate_average_gnabyl.sh +++ b/calculate_average_gnabyl.sh @@ -19,5 +19,5 @@ # source "$HOME/.sdkman/bin/sdkman-init.sh" # sdk use java 21.0.1-graal 1>&2 -JAVA_OPTS="" -time java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_gnabyl +JAVA_OPTS="-XX:+UseStringDeduplication" +java $JAVA_OPTS --class-path target/average-1.0.0-SNAPSHOT.jar dev.morling.onebrc.CalculateAverage_gnabyl diff --git a/prepare_gnabyl.sh b/prepare_gnabyl.sh new file mode 100755 index 000000000..06b81c4dd --- /dev/null +++ b/prepare_gnabyl.sh @@ -0,0 +1,20 @@ +#!/bin/bash +# +# Copyright 2023 The original authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Uncomment below to use sdk +source "$HOME/.sdkman/bin/sdkman-init.sh" +sdk use java 21.0.1-graal 1>&2 diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java index 7c9769e7e..3c27119e5 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_gnabyl.java @@ -16,25 +16,30 @@ package dev.morling.onebrc; import java.io.IOException; +import java.io.PrintWriter; import java.io.RandomAccessFile; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; public class CalculateAverage_gnabyl { private static final String FILE = "./measurements.txt"; - private static final int NB_CHUNKS = 8; + private static final int NB_CHUNKS = Runtime.getRuntime().availableProcessors(); - private static record Chunk(long start, int bytesCount, MappedByteBuffer mappedByteBuffer) { + private static Map stationNameMap = new ConcurrentHashMap<>(10000, 0.9f, NB_CHUNKS); + + private static record Chunk(int bytesCount, MappedByteBuffer mappedByteBuffer) { } private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosition, int startSize) @@ -61,9 +66,9 @@ private static int reduceSizeToFitLineBreak(FileChannel channel, long startPosit return realSize; } - private static List readChunks(long nbChunks) throws IOException { + private static List readChunks(int nbChunks) throws IOException { RandomAccessFile file = new RandomAccessFile(FILE, "rw"); - List res = new ArrayList<>(); + List res = new ArrayList<>(nbChunks); FileChannel channel = file.getChannel(); long bytesCount = channel.size(); long bytesPerChunk = bytesCount / nbChunks; @@ -71,16 +76,18 @@ private static List readChunks(long nbChunks) throws IOException { // Memory map the file in read-only mode // TODO: Optimize using threads long currentPosition = 0; + int startSize; + int realSize; for (int i = 0; i < nbChunks; i++) { - int startSize = (int) bytesPerChunk; - int realSize = startSize; + startSize = (int) bytesPerChunk; + realSize = startSize; if (i == nbChunks - 1) { realSize = (int) (bytesCount - currentPosition); MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, realSize); - res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + res.add(new Chunk(realSize, mappedByteBuffer)); break; } @@ -90,7 +97,7 @@ private static List readChunks(long nbChunks) throws IOException { MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, currentPosition, realSize); - res.add(new Chunk(currentPosition, realSize, mappedByteBuffer)); + res.add(new Chunk(realSize, mappedByteBuffer)); currentPosition += realSize; } @@ -101,32 +108,32 @@ private static List readChunks(long nbChunks) throws IOException { } private static class StationData { - private double sum, min, max; - private long count; + private float sum, min, max; + private int count; - public StationData(double value) { + public StationData(float value) { this.count = 1; this.sum = value; this.min = value; this.max = value; } - public void update(double value) { + public void update(float value) { this.count++; this.sum += value; this.min = Math.min(this.min, value); this.max = Math.max(this.max, value); } - public double getMean() { + public float getMean() { return sum / count; } - public double getMin() { + public float getMin() { return min; } - public double getMax() { + public float getMax() { return max; } @@ -139,47 +146,44 @@ public void mergeWith(StationData other) { } - static double round(double value) { - return Math.round(value * 10.0) / 10.0; + static float round(float value) { + return Math.round(value * 10.0f) * 0.1f; } private static class ChunkResult { - private Map data; + private Map data; public ChunkResult() { data = new HashMap<>(); } - public StationData getData(String name) { - return data.get(name); + public StationData getData(int hash) { + return data.get(hash); } - public void addStation(String name, double value) { - this.data.put(name, new StationData(value)); + public void addStation(int hash, float value) { + this.data.put(hash, new StationData(value)); } public void print() { - var stationNames = new ArrayList(this.data.keySet()); - Collections.sort(stationNames); - System.out.print("{"); - for (int i = 0; i < stationNames.size() - 1; i++) { - var name = stationNames.get(i); - var stationData = data.get(name); - System.out.printf("%s=%.1f/%.1f/%.1f, ", name, round(stationData.getMin()), - round(stationData.getMean()), - round(stationData.getMax())); - } - var name = stationNames.get(stationNames.size() - 1); - var stationData = data.get(name); - System.out.printf("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()), - round(stationData.getMean()), - round(stationData.getMax())); - System.out.println("}"); + PrintWriter out = new PrintWriter(System.out); + out.println( + this.data.keySet().parallelStream() + .map(hash -> { + var stationData = data.get(hash); + var name = stationNameMap.get(hash); + return String.format("%s=%.1f/%.1f/%.1f", name, round(stationData.getMin()), + round(stationData.getMean()), + round(stationData.getMax())); + }) + .sorted((a, b) -> a.split("=")[0].compareTo(b.split("=")[0])) + .collect(Collectors.joining(", ", "{", "}"))); + out.flush(); } public void mergeWith(ChunkResult other) { - for (Map.Entry entry : other.data.entrySet()) { - String stationName = entry.getKey(); + for (Map.Entry entry : other.data.entrySet()) { + int stationName = entry.getKey(); StationData otherStationData = entry.getValue(); StationData thisStationData = this.data.get(stationName); @@ -201,16 +205,22 @@ private static ChunkResult processChunk(Chunk chunk) { chunk.mappedByteBuffer().get(data); // Process each line - String stationName; - double value; + float value; int iSplit, iEol; StationData stationData; - long negative; + int negative; + int hash, prime = 31; + Set seenHashes = new HashSet<>(10000, 0.9f); for (int offset = 0; offset < data.length; offset++) { // Find station name + hash = 0; for (iSplit = offset; data[iSplit] != ';'; iSplit++) { + hash = (hash << 5) - hash + (data[iSplit] & 0xFF); + } + if (!seenHashes.contains(hash)) { + seenHashes.add(hash); + stationNameMap.put(hash, new String(data, offset, iSplit - offset, StandardCharsets.UTF_8)); } - stationName = new String(data, offset, iSplit - offset, StandardCharsets.UTF_8); // Find value iSplit++; @@ -222,7 +232,7 @@ private static ChunkResult processChunk(Chunk chunk) { continue; } if (data[iEol] == '.') { - value = value + (data[iEol + 1] - 48) / 10.0; + value = value + (data[iEol + 1] - 48) * 0.1f; iEol += 2; break; } @@ -231,10 +241,10 @@ private static ChunkResult processChunk(Chunk chunk) { value *= negative; // Init & count - stationData = result.getData(stationName); + stationData = result.getData(hash); if (stationData == null) { - result.addStation(stationName, value); + result.addStation(hash, value); } else { stationData.update(value); @@ -247,32 +257,8 @@ private static ChunkResult processChunk(Chunk chunk) { } private static ChunkResult processAllChunks(List chunks) throws InterruptedException, ExecutionException { - // var globalRes = new ChunkResult(); - // for (var chunk : chunks) { - // var chunkRes = processChunk(chunk); - // globalRes.mergeWith(chunkRes); - // } - // return globalRes; - - List> computeTasks = new ArrayList<>(); - - for (Chunk chunk : chunks) { - computeTasks.add(CompletableFuture.supplyAsync(() -> processChunk(chunk))); - } - - ChunkResult globalRes = null; - - for (CompletableFuture completedTask : computeTasks) { - ChunkResult chunkRes = completedTask.get(); - if (globalRes == null) { - globalRes = completedTask.get(); - } - else { - globalRes.mergeWith(chunkRes); - } - } - - return globalRes; + return chunks.parallelStream().map(CalculateAverage_gnabyl::processChunk).collect(ChunkResult::new, + ChunkResult::mergeWith, ChunkResult::mergeWith); } public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {