···
2
+
* Copyright 2023 The original authors
4
+
* Licensed under the Apache License, Version 2.0 (the "License");
5
+
* you may not use this file except in compliance with the License.
6
+
* You may obtain a copy of the License at
8
+
* http://www.apache.org/licenses/LICENSE-2.0
10
+
* Unless required by applicable law or agreed to in writing, software
11
+
* distributed under the License is distributed on an "AS IS" BASIS,
12
+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+
* See the License for the specific language governing permissions and
14
+
* limitations under the License.
16
+
package dev.morling.onebrc;
18
+
import jdk.incubator.vector.ByteVector;
19
+
import jdk.incubator.vector.VectorSpecies;
20
+
import sun.misc.Unsafe;
22
+
import java.io.IOException;
23
+
import java.lang.foreign.*;
24
+
import java.lang.invoke.MethodHandle;
25
+
import java.nio.ByteOrder;
26
+
import java.nio.channels.FileChannel;
27
+
import java.nio.charset.StandardCharsets;
28
+
import java.nio.file.Paths;
29
+
import java.util.Arrays;
30
+
import java.util.concurrent.CompletableFuture;
31
+
import java.util.concurrent.Executors;
32
+
import java.util.stream.IntStream;
34
+
public class CalculateAverage_linl33 {
35
+
private static final String FILE_PATH_PROPERTY = "dev.morling.onebrc.CalculateAverage_linl33.measurementsPath";
36
+
private static final int WEATHER_STATION_LENGTH_MAX = 100;
37
+
private static final long WEATHER_STATION_DISTINCT_MAX = 10_000L;
38
+
private static final int N_THREADS = Runtime.getRuntime().availableProcessors();
40
+
private static final MemorySegment ALL = MemorySegment.NULL.reinterpret(Long.MAX_VALUE);
41
+
private static final VectorSpecies<Byte> BYTE_SPECIES = ByteVector.SPECIES_PREFERRED;
43
+
private static final Thread.Builder THREAD_BUILDER = Thread
45
+
.name("1brc-CalculateAverage-", 0)
46
+
.inheritInheritableThreadLocals(false);
48
+
private static final Unsafe UNSAFE;
51
+
if (ByteOrder.nativeOrder() != ByteOrder.LITTLE_ENDIAN) {
52
+
throw new UnsupportedOperationException("Error: BE JVMs are not supported");
54
+
if ((BYTE_SPECIES.vectorByteSize() & (BYTE_SPECIES.vectorByteSize() - 1)) != 0) {
55
+
throw new UnsupportedOperationException(STR."Unsupported vectorByteSize \{BYTE_SPECIES.vectorByteSize()}");
59
+
var f = Unsafe.class.getDeclaredField("theUnsafe");
60
+
f.setAccessible(true);
61
+
UNSAFE = (Unsafe) f.get(null);
62
+
} catch (NoSuchFieldException | IllegalAccessException e) {
63
+
throw new RuntimeException(e);
67
+
public static void main() throws InterruptedException, IOException {
68
+
final var filePath = Paths.get(System.getProperty(FILE_PATH_PROPERTY, "./measurements.txt"));
70
+
try (final var channel = FileChannel.open(filePath)) {
71
+
final var inputMapped = channel.map(FileChannel.MapMode.READ_ONLY, 0, channel.size(), Arena.global());
73
+
final var chunkBounds = calcChunkBounds(inputMapped.address(), inputMapped.byteSize());
74
+
final var maps = new SparseMap[N_THREADS];
76
+
try (final var threadPool = Executors.newFixedThreadPool(N_THREADS, THREAD_BUILDER.factory());
77
+
final var singleThreadExecutor = Executors.newSingleThreadExecutor(Thread.ofVirtual().factory())) {
78
+
final var rootTask = CompletableFuture.runAsync(new CalculateAverageTask(maps, chunkBounds, 0), threadPool);
80
+
final var futures = IntStream
81
+
.range(1, N_THREADS)
82
+
.mapToObj(t -> CompletableFuture
83
+
.runAsync(new CalculateAverageTask(maps, chunkBounds, t), threadPool)
84
+
.runAfterBothAsync(rootTask, () -> maps[0].merge(maps[t]), singleThreadExecutor))
85
+
.toArray(CompletableFuture[]::new);
87
+
CompletableFuture.allOf(futures).join();
90
+
printSorted(maps[0]);
94
+
private static long[] calcChunkBounds(final long mappedAddr, final long fileSizeBytes) {
95
+
final var chunkBounds = new long[N_THREADS + 1];
96
+
chunkBounds[0] = mappedAddr;
97
+
chunkBounds[chunkBounds.length - 1] = mappedAddr + fileSizeBytes;
99
+
final var chunkSize = (fileSizeBytes / N_THREADS) & -CalculateAverageTask.BATCH_SIZE_BYTES;
100
+
for (int i = 1; i < chunkBounds.length - 1; i++) {
101
+
chunkBounds[i] = chunkBounds[i - 1] + chunkSize;
104
+
return chunkBounds;
107
+
private static void printSorted(final SparseMap temperatureMeasurements) {
108
+
final var weatherStations = new AggregatedMeasurement[(int) temperatureMeasurements.size];
109
+
final var nameBuffer = new byte[WEATHER_STATION_LENGTH_MAX];
110
+
var offset = temperatureMeasurements.denseAddress;
111
+
for (int i = 0; i < weatherStations.length; i++, offset += SparseMap.DATA_SCALE * Long.BYTES) {
112
+
final var nameAddr = UNSAFE.getLong(offset);
113
+
final var nameLength = UNSAFE.getInt(offset + Integer.BYTES * 7);
114
+
MemorySegment.copy(ALL, ValueLayout.JAVA_BYTE, nameAddr, nameBuffer, 0, nameLength);
115
+
final var nameStr = new String(nameBuffer, 0, nameLength, StandardCharsets.UTF_8);
116
+
weatherStations[i] = new AggregatedMeasurement(nameStr, i);
119
+
Arrays.sort(weatherStations);
121
+
System.out.print('{');
122
+
for (int i = 0; i < weatherStations.length - 1; i++) {
123
+
printAggMeasurement(weatherStations[i], temperatureMeasurements);
124
+
System.out.print(',');
125
+
System.out.print(' ');
127
+
printAggMeasurement(weatherStations[weatherStations.length - 1], temperatureMeasurements);
128
+
System.out.println('}');
131
+
private static void printAggMeasurement(final AggregatedMeasurement aggMeasurement,
132
+
final SparseMap temperatureMeasurements) {
133
+
final var offset = temperatureMeasurements.denseAddress + SparseMap.DATA_SCALE * Long.BYTES * aggMeasurement.id();
136
+
System.out.print(aggMeasurement.name());
137
+
System.out.print('=');
140
+
printAsDouble(offset + Integer.BYTES * 5);
141
+
System.out.print('/');
144
+
final double total = UNSAFE.getLong(offset + Integer.BYTES * 2);
145
+
final var count = UNSAFE.getInt(offset + Integer.BYTES * 4);
146
+
System.out.print(round(total / count / 10d));
147
+
System.out.print('/');
150
+
printAsDouble(offset + Integer.BYTES * 6);
153
+
private static void printAsDouble(final long addr) {
154
+
final var val = (double) UNSAFE.getInt(addr);
155
+
System.out.print(val / 10d);
158
+
private static double round(final double d) {
159
+
return Math.round(d * 10d) / 10d;
162
+
private static class CalculateAverageTask implements Runnable {
163
+
public static final int BATCH_SIZE_BYTES = BYTE_SPECIES.vectorByteSize();
165
+
private final SparseMap[] maps;
166
+
private final long[] chunkBounds;
167
+
private final long chunkStart;
168
+
private final long chunkEnd;
169
+
private final int t;
171
+
private SparseMap map;
173
+
public CalculateAverageTask(SparseMap[] maps, long[] chunkBounds, int t) {
175
+
this.chunkBounds = chunkBounds;
176
+
this.chunkStart = chunkBounds[t];
177
+
this.chunkEnd = chunkBounds[t + 1];
182
+
public void run() {
183
+
this.maps[this.t] = new SparseMap();
184
+
this.map = this.maps[this.t];
186
+
var lineStart = this.chunkBounds[0];
187
+
// walk back to find the previous '\n' and use it as lineStart
188
+
for (long i = this.chunkStart - 1; i > this.chunkBounds[0]; i--) {
189
+
if (UNSAFE.getByte(i) == (byte) '\n') {
190
+
lineStart = i + 1L;
195
+
final var vectorLimit = this.chunkStart + ((this.chunkEnd - this.chunkStart) & -BYTE_SPECIES.vectorByteSize());
196
+
for (long i = this.chunkStart; i < vectorLimit; i += BYTE_SPECIES.vectorByteSize()) {
197
+
var lfMask = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, i, ByteOrder.nativeOrder())
201
+
final var lfCount = Long.bitCount(lfMask);
202
+
for (int j = 0; j < lfCount; j++) {
203
+
final var lfPosRelative = Long.numberOfTrailingZeros(lfMask);
204
+
final var lfAddress = i + lfPosRelative;
205
+
processLine(lineStart, lfAddress);
207
+
lineStart = lfAddress + 1L;
208
+
// unset the lowest set bit, should compile to BLSR
209
+
lfMask &= lfMask - 1L;
213
+
if (vectorLimit != this.chunkEnd) {
214
+
processTrailingBytes(lineStart, vectorLimit, this.chunkEnd);
218
+
private void processTrailingBytes(long lineStart,
221
+
for (long i = start; i < end; i++) {
222
+
final var b = UNSAFE.getByte(i);
223
+
if (b != (byte) '\n') {
227
+
processLine(lineStart, i);
232
+
private void processLine(final long lineStart, final long lfAddress) {
233
+
// read 5 bytes before '\n'
234
+
// the temperature is formatted to 1 decimal place
235
+
// therefore the shortest temperature value is 0.0
236
+
// so there are always at least 5 bytes between the location name and '\n'
237
+
final var trailing5Bytes = UNSAFE.getLong(lfAddress - 5);
238
+
final int trailingDWordRaw = (int) (trailing5Bytes >>> 8);
240
+
// select the low nibble for each byte, '0'-'9' -> 0-9, ';' -> 11, '-' -> 13
241
+
final var trailingDWordLowNibble = trailingDWordRaw & 0x0f_0f_0f_0f;
242
+
// parse the 2 digits around the decimal point (note that these 2 digits must be present)
243
+
final var trailingDigitsParsed = (trailingDWordLowNibble * 0x00_0a_00_01) >>> 24;
245
+
// this byte must be ('-' & 0xf), (';' & 0xf), or a valid digit (0-9)
246
+
final var secondHighestByte = trailingDWordLowNibble & 0xf;
248
+
var temperature = trailingDigitsParsed;
249
+
var lineLength = lfAddress - lineStart - 4;
251
+
if (secondHighestByte > 9) {
252
+
if (secondHighestByte == ('-' & 0xf)) {
254
+
temperature = -temperature;
259
+
temperature += secondHighestByte * 100;
261
+
final var isNegative = (trailing5Bytes & 0xffL) == '-';
264
+
temperature = -temperature;
268
+
this.map.putEntry(lineStart, (int) lineLength, temperature);
273
+
* Open addressing, linear probing hash map backed by off-heap memory
275
+
private static class SparseMap {
276
+
private static final int TRUNCATED_HASH_BITS = 26;
277
+
// max # of unique keys
278
+
private static final long DENSE_SIZE = WEATHER_STATION_DISTINCT_MAX;
279
+
// max hash code (exclusive)
280
+
private static final long SPARSE_SIZE = 1L << (TRUNCATED_HASH_BITS + 1);
281
+
private static final long DATA_SCALE = 4;
283
+
public final long sparseAddress;
284
+
public final long denseAddress;
287
+
public SparseMap() {
288
+
var arena = new MallocArena(Arena.global());
289
+
var callocArena = new CallocArena(Arena.global());
293
+
final var sparse = callocArena.allocate(ValueLayout.JAVA_LONG, SPARSE_SIZE);
294
+
this.sparseAddress = (sparse.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN;
296
+
final var dense = arena.allocate(ValueLayout.JAVA_LONG, DENSE_SIZE * DATA_SCALE);
297
+
this.denseAddress = (dense.address() + MallocArena.MAX_ALIGN) & -MallocArena.MAX_ALIGN;
300
+
public void putEntry(final long keyAddress, final int keyLength, final int value) {
301
+
final var hash = hash(keyAddress, keyLength);
302
+
this.putEntryInternal(hash, keyAddress, keyLength, value, 1, value, value);
305
+
private void putEntryInternal(final long hash,
306
+
final long keyAddress,
307
+
final int keyLength,
308
+
final long temperature,
310
+
final int temperatureMin,
311
+
final int temperatureMax) {
312
+
final var sparseOffset = this.sparseAddress + truncateHash(hash) * Long.BYTES;
314
+
for (long n = 0, sparseLinearOffset = sparseOffset; n < WEATHER_STATION_DISTINCT_MAX; n++, sparseLinearOffset += Long.BYTES) {
315
+
final var denseOffset = UNSAFE.getLong(sparseLinearOffset);
316
+
if (denseOffset == 0L) {
317
+
this.add(sparseLinearOffset, keyAddress, keyLength, temperature, count, temperatureMin, temperatureMax);
322
+
if (isCollision(keyAddress, keyLength, denseOffset)) {
326
+
final var currTotal = UNSAFE.getLong(denseOffset + Integer.BYTES * 2);
327
+
UNSAFE.putLong(denseOffset + Integer.BYTES * 2, currTotal + temperature); // total
329
+
final var currCount = UNSAFE.getInt(denseOffset + Integer.BYTES * 4);
330
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 4, currCount + count); // count
332
+
final var currMin = UNSAFE.getInt(denseOffset + Integer.BYTES * 5);
333
+
if (temperatureMin < currMin) {
334
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin); // min
337
+
final var currMax = UNSAFE.getInt(denseOffset + Integer.BYTES * 6);
338
+
if (temperatureMax > currMax) {
339
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax); // max
346
+
public void merge(final SparseMap other) {
347
+
final var otherSize = other.size;
348
+
for (long i = 0, offset = other.denseAddress; i < otherSize; i++, offset += DATA_SCALE * Long.BYTES) {
349
+
final var keyAddress = UNSAFE.getLong(offset);
350
+
final var keyLength = UNSAFE.getInt(offset + Integer.BYTES * 7);
351
+
final var hash = hash(keyAddress, keyLength);
353
+
this.putEntryInternal(
357
+
UNSAFE.getLong(offset + Integer.BYTES * 2),
358
+
UNSAFE.getInt(offset + Integer.BYTES * 4),
359
+
UNSAFE.getInt(offset + Integer.BYTES * 5),
360
+
UNSAFE.getInt(offset + Integer.BYTES * 6));
364
+
private void add(final long sparseOffset,
365
+
final long keyAddress,
366
+
final int keyLength,
367
+
final long temperature,
369
+
final int temperatureMin,
370
+
final int temperatureMax) {
371
+
// new entry, initialize sparse and dense
372
+
final var denseOffset = this.denseAddress + this.size * DATA_SCALE * Long.BYTES;
373
+
UNSAFE.putLong(sparseOffset, denseOffset);
375
+
UNSAFE.putLong(denseOffset, keyAddress);
376
+
UNSAFE.putLong(denseOffset + Integer.BYTES * 2, temperature);
377
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 4, count);
378
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 5, temperatureMin);
379
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 6, temperatureMax);
380
+
UNSAFE.putInt(denseOffset + Integer.BYTES * 7, keyLength);
383
+
private static boolean isCollision(final long keyAddress, final int keyLength, final long denseOffset) {
384
+
// key length compare is unnecessary
386
+
final var entryKeyAddress = UNSAFE.getLong(denseOffset);
387
+
return mismatch(keyAddress, entryKeyAddress, keyLength);
390
+
private static boolean mismatch(final long leftAddr, final long rightAddr, final int length) {
391
+
// key length compare is unnecessary
392
+
// strings compared through delimiter byte ';'
394
+
final var loopBound = length >= (BYTE_SPECIES.vectorByteSize() - 1) ? ((length + 1) & -BYTE_SPECIES.vectorByteSize()) : 0;
395
+
for (long i = 0; i < loopBound; i += BYTE_SPECIES.vectorByteSize()) {
396
+
final var l = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, leftAddr + i, ByteOrder.nativeOrder());
397
+
final var r = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, rightAddr + i, ByteOrder.nativeOrder());
398
+
if (!l.eq(r).allTrue()) {
403
+
final var l = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, leftAddr + loopBound, ByteOrder.nativeOrder());
404
+
final var r = ByteVector.fromMemorySegment(BYTE_SPECIES, ALL, rightAddr + loopBound, ByteOrder.nativeOrder());
405
+
final var eqMask = l.eq(r).toLong();
407
+
// LE compare to add 1 to length
408
+
return Long.numberOfTrailingZeros(~eqMask) <= (length - loopBound);
409
+
// to support platforms without TZCNT, the check can be replaced with
410
+
// a comparison to lowestZero = ~eqMask & (eqMask + 1)
413
+
// Use the leading and trailing few bytes as hash
414
+
// this performs better than computing a good hash
415
+
private static long hash(final long keyAddress, final int keyLength) {
416
+
final var leadingQWord = UNSAFE.getLong(keyAddress);
417
+
// the constant is the 64 bit FNV-1 offset basis
418
+
final var hash = -3750763034362895579L ^ leadingQWord;
419
+
if (keyLength < Integer.BYTES) {
420
+
// the key is at least 2 bytes (if you count the delimiter)
421
+
return hash & 0xffffL;
424
+
final var trailingDWord = UNSAFE.getLong(keyAddress + keyLength - Integer.BYTES) & 0xffffffffL;
425
+
// only the lower dword in hash is guaranteed to exist so shift left 32
426
+
return (hash << Integer.SIZE) ^ trailingDWord;
430
+
private static long truncateHash(final long hash) {
431
+
return ((hash >>> TRUNCATED_HASH_BITS) ^ hash) & ((1L << TRUNCATED_HASH_BITS) - 1L);
435
+
private static class MallocArena implements Arena {
436
+
public static final long MAX_ALIGN = 1L << 21;
438
+
protected static final Linker LINKER = Linker.nativeLinker();
439
+
protected static final AddressLayout C_POINTER = (AddressLayout) LINKER.canonicalLayouts().get("void*");
440
+
protected static final ValueLayout C_SIZE_T = (ValueLayout) LINKER.canonicalLayouts().get("size_t");
441
+
private static final MethodHandle MALLOC = LINKER.downcallHandle(
442
+
LINKER.defaultLookup().find("malloc").orElseThrow(),
443
+
FunctionDescriptor.of(C_POINTER, C_SIZE_T),
444
+
Linker.Option.critical(false));
445
+
private static final MethodHandle FREE = LINKER.downcallHandle(
446
+
LINKER.defaultLookup().find("free").orElseThrow(),
447
+
FunctionDescriptor.ofVoid(C_POINTER),
448
+
Linker.Option.critical(false));
449
+
protected static final MethodHandle CALLOC = LINKER.downcallHandle(
450
+
LINKER.defaultLookup().find("calloc").orElseThrow(),
451
+
FunctionDescriptor.of(C_POINTER, C_SIZE_T, C_SIZE_T),
452
+
Linker.Option.critical(false));
454
+
private final Arena arena;
456
+
public MallocArena(Arena arena) {
457
+
this.arena = arena;
461
+
public MemorySegment allocate(final long byteSize, final long byteAlignment) {
462
+
return malloc(byteSize + MAX_ALIGN).reinterpret(this, MallocArena::free);
466
+
public MemorySegment.Scope scope() {
467
+
return arena.scope();
471
+
public void close() {
475
+
private static MemorySegment malloc(final long byteSize) {
477
+
return ((MemorySegment) MALLOC.invokeExact(byteSize)).reinterpret(byteSize);
479
+
catch (Throwable e) {
480
+
throw new RuntimeException(e);
484
+
protected static void free(final MemorySegment address) {
486
+
FREE.invokeExact(address);
488
+
catch (Throwable e) {
489
+
throw new RuntimeException(e);
494
+
private static class CallocArena extends MallocArena {
495
+
public CallocArena(Arena arena) {
500
+
public MemorySegment allocate(final long byteSize, final long byteAlignment) {
501
+
return calloc(byteSize + MAX_ALIGN).reinterpret(this, MallocArena::free);
504
+
private static MemorySegment calloc(final long byteSize) {
506
+
return ((MemorySegment) MallocArena.CALLOC.invokeExact(1L, byteSize)).reinterpret(byteSize);
508
+
catch (Throwable e) {
509
+
throw new RuntimeException(e);
514
+
private record AggregatedMeasurement(String name, long id) implements Comparable<AggregatedMeasurement> {
517
+
public int compareTo(final AggregatedMeasurement other) {
518
+
return name.compareTo(other.name);