···
2
+
* Copyright 2023 The original authors
4
+
* Licensed under the Apache License, Version 2.0 (the "License");
5
+
* you may not use this file except in compliance with the License.
6
+
* You may obtain a copy of the License at
8
+
* http://www.apache.org/licenses/LICENSE-2.0
10
+
* Unless required by applicable law or agreed to in writing, software
11
+
* distributed under the License is distributed on an "AS IS" BASIS,
12
+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+
* See the License for the specific language governing permissions and
14
+
* limitations under the License.
16
+
package dev.morling.onebrc;
20
+
import java.nio.channels.*;
21
+
import java.util.stream.*;
24
+
/* A simple implementation that memory maps the file, reads chunks in parallel and minimises allocation without any unsafe.
26
+
* Timings on 4 core i7-7500U CPU @ 2.70GHz:
27
+
* average_baseline: 4m48s
30
+
public class CalculateAverage_ianopolous {
32
+
public static final int MAX_LINE_LENGTH = 107;
33
+
public static final int MAX_STATIONS = 10000;
35
+
public static void main(String[] args) {
36
+
File input = new File("./measurements.txt");
37
+
long filesize = input.length();
38
+
long chunkSize = 256 * 1024 * 1024;
39
+
int nChunks = (int) ((filesize + chunkSize - 1) / chunkSize);
40
+
List<HashMap<String, Stat>> allResults = IntStream.range(0, nChunks).mapToObj(i -> {
41
+
HashMap<String, Stat> results = new HashMap(512);
42
+
parseStats(i * chunkSize, Math.min((i + 1) * chunkSize, filesize), results);
44
+
}).parallel().toList();
45
+
HashMap<String, Stat> result = allResults.getFirst();
46
+
for (int i = 1; i < allResults.size(); ++i) {
47
+
for (Map.Entry<String, Stat> entry : allResults.get(i).entrySet()) {
48
+
Stat current = result.putIfAbsent(entry.getKey(), entry.getValue());
49
+
if (current != null) {
50
+
current.merge(entry.getValue());
55
+
System.out.println(new TreeMap<>(result));
58
+
public record Station(String name, ByteBuffer buf) {
61
+
public static boolean matchingStationBytes(int start, int end, MappedByteBuffer buffer, Station existing) {
62
+
buffer.position(start);
63
+
for (int i = start; i < end; i++) {
64
+
if (existing.buf.get(i - start) != buffer.get(i))
70
+
public static Station parseStation(int start, int end, int hash, MappedByteBuffer buffer, List<List<Station>> stations) {
71
+
int index = Math.floorMod(hash, MAX_STATIONS);
72
+
List<Station> matches = stations.get(index);
73
+
if (matches == null) {
74
+
List<Station> value = new ArrayList<>();
75
+
byte[] stationBuffer = new byte[end - start];
76
+
buffer.position(start);
77
+
buffer.get(stationBuffer);
78
+
String name = new String(stationBuffer);
79
+
Station res = new Station(name, ByteBuffer.wrap(stationBuffer));
81
+
stations.set(index, value);
85
+
for (int i = 0; i < matches.size(); i++) {
86
+
Station s = matches.get(i);
87
+
if (matchingStationBytes(start, end, buffer, s))
90
+
byte[] stationBuffer = new byte[end - start];
91
+
buffer.position(start);
92
+
buffer.get(stationBuffer);
93
+
Station res = new Station(new String(stationBuffer), ByteBuffer.wrap(stationBuffer));
99
+
public static void parseStats(long startByte, long endByte, Map<String, Stat> results) {
101
+
RandomAccessFile file = new RandomAccessFile("./measurements.txt", "r");
102
+
long maxEnd = Math.min(file.length(), endByte + MAX_LINE_LENGTH);
103
+
long len = maxEnd - startByte;
104
+
if (len > Integer.MAX_VALUE)
105
+
throw new RuntimeException("Segment size must fit into an int");
106
+
int maxDone = (int) (endByte - startByte);
107
+
MappedByteBuffer buffer = file.getChannel().map(FileChannel.MapMode.READ_ONLY, startByte, len);
109
+
// read first partial line
110
+
if (startByte > 0) {
111
+
for (int i = 0; i < MAX_LINE_LENGTH; i++) {
112
+
byte b = buffer.get(i);
120
+
List<List<Station>> stations = new ArrayList<>(MAX_STATIONS);
121
+
for (int i = 0; i < MAX_STATIONS; i++)
122
+
stations.add(null);
123
+
int lineStart = done;
125
+
long temperature = 0;
127
+
boolean negative = false;
128
+
while (done < maxDone) {
129
+
Station station = null;
130
+
for (int i = done; i < done + MAX_LINE_LENGTH && i < maxEnd; i++) {
131
+
byte b = buffer.get(i);
134
+
Stat res = results.get(station.name);
135
+
temperature = negative ? -temperature : temperature;
137
+
res.add(temperature);
141
+
res.add(temperature);
142
+
results.put(station.name, res);
149
+
else if (b == ';') {
151
+
station = parseStation(lineStart, lineSplit, hash, buffer, stations);
155
+
else if (b == '-' && station != null) {
158
+
else if (b != '.' && station != null) {
159
+
temperature = temperature * 10 + (b - 0x30);
162
+
hash = 31 * hash + b;
167
+
catch (IOException e) {
168
+
throw new RuntimeException(e);
172
+
public static class Stat {
173
+
long min = Long.MAX_VALUE, max = Long.MIN_VALUE, total = 0, count = 0;
175
+
public void add(long value) {
184
+
public void merge(Stat value) {
185
+
if (value.min < min)
187
+
if (value.max > max)
189
+
total += value.total;
190
+
count += value.count;
193
+
private static double round(double value) {
194
+
return Math.round(value) / 10.0;
197
+
public String toString() {
198
+
return round((double) min) + "/" + round(((double) total) / count) + "/" + round((double) max);