diff --git a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java index 1150f4296..f8b78a050 100644 --- a/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java +++ b/src/main/java/dev/morling/onebrc/CalculateAverage_shipilev.java @@ -31,19 +31,32 @@ public class CalculateAverage_shipilev { - // This might not be the fastest implementation one can do. - // When working on this implementation, I set the bar as follows. + // Detour: This implementation tries to balance the speed and readability. // - // This implementation uses vanilla and standard Java as much as possible, - // without relying on Unsafe tricks and preview features. If and when - // those are used, they should be guarded by a feature flag. This would - // allow running vanilla implementation if anything goes off the rails. + // While the original contest suggests we pull off every trick in the + // book to get the peak performance, here we set a more pragmatic goal: + // how fast we can get without going too far into hacks. Or, putting it + // in another way, what would be the reasonably fast implementation that + // would *also* pass a code review in a reasonable project, would be usable + // in production without waking people up in the middle of the night, and + // would work through JDK updates, upgrades, and migrations. + // + // To that end, this implementation uses vanilla and standard Java as much + // as possible, without relying on Unsafe tricks and preview features. + // When any non-standard things are used, they are guarded by a feature flag, + // which allows to cleanly turn them off when anything goes off the rails. + // + // For performance reasons, the implementation takes more care to be reliably + // parallel to survive I/O stalls and scheduling oddities. This would not + // show up in laboratory conditions, but it is a necessary thing for a reliable + // code in production. It also tries not to miss simple optimizations without + // going too far into the woods. + // + // Note that some of the magic to run this workload fast in evaluation + // conditions is done separately in the invocation script. Most of that + // is only needed for the short-running scenarios. In real life, this code + // would likely run well without any of that. // - // This implementation also covers the realistic scenario: the I/O is - // actually slow and jittery. To that end, making sure we can feed - // the parsing code under slow I/O is as important as getting the - // parsing fast. Current evaluation env keeps the input data on RAM disk, - // which hides this important part. // ========================= Tunables ========================= @@ -57,17 +70,19 @@ public class CalculateAverage_shipilev { // Fixed size of the measurements map. Must be the power of two. Should // be large enough to accomodate all the station names. Rules say there are - // 10K station names max, so anything >> 16K works well. + // 10K station names max, so anything more than 16K works well. private static final int MAP_SIZE = 1 << 15; // The largest mmap-ed chunk. This can be be Integer.MAX_VALUE, but // it is normally tuned down to seed the workers with smaller mmap regions - // more efficiently. + // more efficiently. This also allows to incrementally unmap chunks as we + // complete working on them. private static final int MMAP_CHUNK_SIZE = Integer.MAX_VALUE / 32; // The largest slice as unit of work, processed serially by a worker. // Set it too low and there would be more tasks and less batching, but // more parallelism. Set it too high, and the reverse would be true. + // Something around a large page would likely hit the right balance. private static final int UNIT_SLICE_SIZE = 4 * 1024 * 1024; // Employ direct unmapping techniques to alleviate the cost of system @@ -80,6 +95,7 @@ public class CalculateAverage_shipilev { // ========================= Storage ========================= // Thread-local measurement maps, each thread gets one. + // This allows workers to work nearly unimpeded without synchronization. // Even though crude, avoid lambdas here to alleviate startup costs. private static final ThreadLocal MAPS = ThreadLocal.withInitial(new Supplier<>() { @Override @@ -90,20 +106,21 @@ public MeasurementsMap get() { } }); - // After worker threads finish, the data is available here. One just needs - // to merge it a little. + // After worker threads finish, the data is available here. The reporting + // code would pull the maps from here, once all workers finish. private static final ConcurrentLinkedQueue ALL_MAPS = new ConcurrentLinkedQueue<>(); // Releasable mmaped buffers that workers are done with. These can be un-mapped - // in background. Part of the protocol to shutdown the background activity is to - // issue the poison pill. + // in background. Main thread would wait on this queue, until it gets the poison + // pill from the root task. private static final LinkedBlockingQueue RELEASABLE_BUFFERS = new LinkedBlockingQueue<>(); private static final ByteBuffer RELEASABLE_BUFFER_POISON_PILL = ByteBuffer.allocate(1); // ========================= MEATY GRITTY PARTS: PARSE AND AGGREGATE ========================= public static final class Bucket { - // Raw station name, its hash, and prefixes. + // Raw station name, encoded as two prefixes and the name tail, + // its total length, and hash. public final byte[] nameTail; public final int len; public final int hash; @@ -118,7 +135,8 @@ public static final class Bucket { public Bucket(ByteBuffer slice, int begin, int end, int hash, int temp) { len = end - begin; - // Also pick up any prefixes to simplify future matches. + // Decode the station name. It is handy to have a few prefixes + // available to simplify matches later. int tailStart = 0; if (len >= 8) { prefix1 = slice.getInt(begin + 0); @@ -135,12 +153,15 @@ else if (len >= 4) { prefix2 = 0; } - // The rest goes to tail byte array. We are checking it names on hot-path. + // The rest goes to tail byte array. We are checking reading it on hot-path. // Therefore, it is convenient to keep allocation for names near the buckets. + // One can avoid this by carefully recording the tail in a separate field, + // like the prefixes above, but this is simple enough to gain enough perf. int tailLen = len - tailStart; nameTail = new byte[tailLen]; slice.get(begin + tailStart, nameTail, 0, tailLen); + // Seed the bucket with initial value. this.hash = hash; this.sum = temp; this.count = 1; @@ -148,7 +169,7 @@ else if (len >= 4) { this.max = temp; } - // Little helper method to compare the array with given bytebuffer range. + // Little helper method to compare the array with given ByteBuffer range. public boolean matches(ByteBuffer cand, int begin, int end) { int origLen = len; int candLen = end - begin; @@ -156,7 +177,7 @@ public boolean matches(ByteBuffer cand, int begin, int end) { return false; } - // Check the prefixes first, to simplify the matches. + // Check the prefixes first, if we can. int tailStart = 0; if (origLen >= 8) { if (prefix1 != cand.getInt(begin)) { @@ -183,6 +204,7 @@ else if (origLen >= 4) { return true; } + // Check if current Bucket matches another. public boolean matches(Bucket other) { return len == other.len && prefix1 == other.prefix1 && @@ -190,9 +212,14 @@ public boolean matches(Bucket other) { Arrays.equals(nameTail, other.nameTail); } + // Merge the temp value. Hot-path, should be fairly efficient. public void merge(int value) { sum += value; count++; + + // We rarely do the updates, so these branches are almost + // never taken. Writing them as explicit branches instead of + // Math.{min,max} improves performance a bit. if (value < min) { min = value; } @@ -201,6 +228,7 @@ public void merge(int value) { } } + // Merge the buckets. Called during reporting, not a hot path. public void merge(Bucket s) { sum += s.sum; count += s.count; @@ -209,7 +237,8 @@ public void merge(Bucket s) { } public Row toRow() { - // Reconstruct the name + // Reconstruct the name first. The prefixes and the tail were copied + // from the little-endian slice, so we need to match the endianness here. ByteBuffer bb = ByteBuffer.allocate(len); bb.order(ByteOrder.LITTLE_ENDIAN); if (len >= 4) { @@ -231,7 +260,7 @@ public Row toRow() { // Quick and dirty linear-probing hash map. YOLO. public static final class MeasurementsMap { // Individual map buckets. Inlining these straight into map complicates - // the implementation without the sensible performance improvement. + // the implementation without much of the performance improvement. // The map is likely sparse, so whatever footprint loss we have due to // Bucket headers we gain by allocating the buckets lazily. The memory // dereference costs are still high in both cases. The additional benefit @@ -240,14 +269,14 @@ public static final class MeasurementsMap { private final Bucket[] buckets = new Bucket[MAP_SIZE]; // Fast path is inlined in seqCompute. This is a slow-path that is taken - // when something is off. We normally do not enter here. + // rarely, usually when there is a hash collision. We normally do not enter here. private void updateSlow(ByteBuffer name, int begin, int end, int hash, int temp) { int idx = hash & (MAP_SIZE - 1); while (true) { Bucket cur = buckets[idx]; if (cur == null) { - // No bucket yet, lucky us. Create the bucket with it. + // No bucket yet, lucky us. Create the bucket and be done. buckets[idx] = new Bucket(name, begin, end, hash, temp); return; } @@ -287,9 +316,9 @@ else if ((cur.hash == other.hash) && cur.matches(other)) { } } - // Convert from internal representation to the rows. - // This does several major things: filters away null-s, instantates full Strings, - // and computes stats. + // Convert from internal representation to the rows. This does several + // major things: filters away null-s, instantates full Strings, and + // computes the final rows. public int fill(Row[] rows) { int idx = 0; for (Bucket bucket : buckets) { @@ -308,12 +337,15 @@ public static final class ParsingTask extends CountedCompleter { private final MappedByteBuffer mappedBuf; private final ByteBuffer buf; + // Entered from the root task, records the original mmap-ed slice + // for later cleanup. public ParsingTask(CountedCompleter p, MappedByteBuffer mappedBuf) { super(p); this.mappedBuf = mappedBuf; this.buf = mappedBuf; } + // Entered from the other parsing tasks. public ParsingTask(CountedCompleter p, ByteBuffer buf) { super(p); this.mappedBuf = null; @@ -334,6 +366,10 @@ public void compute() { @Override public void onCompletion(CountedCompleter caller) { + // FJP API: Would be called when this task completes. At that point, + // we know the mmap-ed slice is not needed anymore, and can give it + // out for unmmaps. We do not do unmmap here, let the main thread + // handle it for us, as we go on doing other hot work. if (DIRECT_UNMMAPS && (mappedBuf != null)) { RELEASABLE_BUFFERS.offer(mappedBuf); } @@ -342,7 +378,7 @@ public void onCompletion(CountedCompleter caller) { private void internalCompute() throws Exception { int len = buf.limit(); if (len > UNIT_SLICE_SIZE) { - // Split in half. + // Still a large chunk, let's split it in half. int mid = len / 2; // Figure out the boundary that does not split the line. @@ -363,13 +399,17 @@ private void internalCompute() throws Exception { new ParsingTask(this, buf.slice(mid, len - mid)).compute(); } else { + // Small enough chunk, time to process it. // The call to seqCompute would normally be non-inlined. // Do setup stuff here to save inlining budget. MeasurementsMap map = MAPS.get(); // Force the order we need for bit extraction to work. This fits // most of the hardware very well without introducing platform - // dependencies. + // dependencies. Note that it would be wrong to use nativeOrder() + // here, because we _need_ a particular byte ordering for our + // computations to work. It just so happens that most hardware + // we have is LE. buf.order(ByteOrder.LITTLE_ENDIAN); // Go! @@ -387,10 +427,12 @@ private void seqCompute(MeasurementsMap map, ByteBuffer origSlice, int length) t // object, which allows compiler to trust its fields more thoroughly. ByteBuffer slice = origSlice.slice(); - // Do the same endianness as the original slice. + // New slice lost the endianness setting, set it up as the original slice. slice.order(ByteOrder.LITTLE_ENDIAN); - // Touch the buffer once to let the common checks to fire once for this slice. + // Touch the buffer once to let the compiler eject the common checks + // for this slice from the loop here. This is an odd, flaky, and sometimes + // desperate, but a safe thing to do. slice.get(0); int idx = 0; @@ -418,47 +460,46 @@ private void seqCompute(MeasurementsMap map, ByteBuffer origSlice, int length) t int nameEnd = idx - 1; // Parse out the temperature. The rules specify temperatures - // are within -99.9..99.9. We implicitly look ahead for - // negative sign and carry the negative multiplier, if found. - // After that, we just need to reconstruct the temperature from - // two or three digits. The aggregation code expects temperatures - // at 10x scale. - + // are within -99.9..99.9. This means even in the shortest case of + // "0.0", we are not out of bounds for the int-sized read. int intTemp = slice.getInt(idx); int neg = 1; if ((intTemp & 0xFF) == '-') { // Unlucky, there is a sign. Record it, shift one byte and read // the remaining digit again. Surprisingly, doing a second read - // is not worse than reading into long and trying to do bit - // shifts on it. + // is not significantly worse than reading into long and trying + // to do bit shifts on it. But it is significantly simpler. neg = -1; intTemp >>>= 8; intTemp |= slice.get(idx + 4) << 24; idx++; } - // Since the sign is consumed, we are only left with two cases: + // Since the sign is consumed, we are only left with two cases, + // which means we can trivially extract the number from int. int temp = 0; if ((intTemp >>> 24) == '\n') { - // EOL-digitL-point-digitH + // Case 1: EOL-digitL-point-digitH temp = (((intTemp & 0xFF)) - '0') * 10 + ((intTemp >> 16) & 0xFF) - '0'; idx += 4; } else { - // digitL-point-digitH-digitHH + // Case 2: digitL-point-digitH-digitHH temp = (((intTemp & 0xFF)) - '0') * 100 + (((intTemp >> 8) & 0xFF) - '0') * 10 + (((intTemp >>> 24)) - '0'); idx += 5; } + + // All done, just flip the sign, if needed. temp *= neg; // Time to update! Bucket bucket = buckets[nameHash & (MAP_SIZE - 1)]; if ((bucket != null) && (nameHash == bucket.hash) && bucket.matches(slice, nameBegin, nameEnd)) { - // Lucky fast path, existing bucket hit. Most of the time we complete here. + // Lucky fast path: matching bucket hit. Most of the time we complete here. bucket.merge(temp); } else { @@ -475,9 +516,8 @@ private void seqCompute(MeasurementsMap map, ByteBuffer origSlice, int length) t // task and let it split, but unfortunately buffer API does not allow us // "long" start-s and length-s. So we have to chunk at least by mmap-ed // size first. It is a CountedCompleter for the same reason ParsingTask is. - // This also gives us a very nice opportunity to complete the work on - // a given mmap slice, while there is still other work to do. This allows - // us to unmap slices on the go. + // This also gives us a very nice opportunity to process mmap-ed chunks + // one by one, thus allowing incremental unmmaps. public static final class RootTask extends CountedCompleter { public RootTask() { super(null); @@ -516,7 +556,7 @@ private void internalCompute() throws Exception { } end = minEnd + w; - // Fork out the large slice + // Fork out the large slice. long len = end - start; MappedByteBuffer slice = fc.map(FileChannel.MapMode.READ_ONLY, start, len); start += len; @@ -524,7 +564,7 @@ private void internalCompute() throws Exception { // FJP API: Announce we have a pending task before forking. addToPendingCount(1); - // ...and fork it + // ...and fork it! new ParsingTask(this, slice).fork(); } @@ -537,6 +577,9 @@ private void internalCompute() throws Exception { @Override public void onCompletion(CountedCompleter caller) { + // FJP API: This would be called when root task completes along with + // all subtasks. This means the processing is done, we can go and + // tell main thread about that. try { RELEASABLE_BUFFERS.put(RELEASABLE_BUFFER_POISON_PILL); } @@ -558,7 +601,8 @@ public static void main(String[] args) throws Exception { // While the root task is working, prepare what we need for the // end of the run. Go and try to report something to prepare the - // reporting code for execution. + // reporting code for execution. This prepares classes, storage, + // and some profiles for eventual execution. MeasurementsMap map = new MeasurementsMap(); Row[] rows = new Row[MAP_SIZE]; StringBuilder sb = new StringBuilder(16384);