KAFKA-12708 Rewrite org.apache.kafka.test.Microbenchmarks by JMH (#16231)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po (Cooper) Tseng 2024-06-14 16:47:34 +08:00 committed by GitHub
parent 8afa5e74ac
commit 888a177603
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 198 additions and 188 deletions

View File

@ -58,7 +58,7 @@
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/> files="(SaslServerAuthenticator|SaslAuthenticatorTest).java"/>
<suppress checks="NPath" <suppress checks="NPath"
files="(Microbenchmarks|SaslServerAuthenticator).java"/> files="(SaslServerAuthenticator).java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"
files="Errors.java"/> files="Errors.java"/>
<suppress checks="ClassFanOutComplexity" <suppress checks="ClassFanOutComplexity"

View File

@ -24,7 +24,11 @@ import java.util.Set;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
/** /**
* A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification * A simple read-optimized map implementation that synchronizes only writes and does a full copy on each modification.
* <p>
* Through the {@link org.apache.kafka.jmh.util.ConcurrentMapBenchmark}, we observed that in scenarios where
* write operations (i.e. computeIfAbsent) constitute 10%, the get performance of CopyOnWriteMap is lower compared to
* ConcurrentHashMap. However, when iterating over entrySet and values, CopyOnWriteMap performs better than ConcurrentHashMap.
*/ */
public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> { public class CopyOnWriteMap<K, V> implements ConcurrentMap<K, V> {

View File

@ -1,186 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.apache.kafka.common.utils.Time;
public class Microbenchmarks {
public static void main(String[] args) throws Exception {
final int iters = Integer.parseInt(args[0]);
double x = 0.0;
long start = System.nanoTime();
for (int i = 0; i < iters; i++)
x += Math.sqrt(x);
System.out.println(x);
System.out.println("sqrt: " + (System.nanoTime() - start) / (double) iters);
// test clocks
systemMillis(iters);
systemNanos(iters);
long total = 0;
start = System.nanoTime();
total += systemMillis(iters);
System.out.println("System.currentTimeMillis(): " + (System.nanoTime() - start) / iters);
start = System.nanoTime();
total += systemNanos(iters);
System.out.println("System.nanoTime(): " + (System.nanoTime() - start) / iters);
System.out.println(total);
// test random
int n = 0;
Random random = new Random();
start = System.nanoTime();
for (int i = 0; i < iters; i++) {
n += random.nextInt();
}
System.out.println(n);
System.out.println("random: " + (System.nanoTime() - start) / iters);
float[] floats = new float[1024];
for (int i = 0; i < floats.length; i++)
floats[i] = random.nextFloat();
Arrays.sort(floats);
int loc = 0;
start = System.nanoTime();
for (int i = 0; i < iters; i++)
loc += Arrays.binarySearch(floats, floats[i % floats.length]);
System.out.println(loc);
System.out.println("binary search: " + (System.nanoTime() - start) / iters);
final Time time = Time.SYSTEM;
final AtomicBoolean done = new AtomicBoolean(false);
final Object lock = new Object();
Thread t1 = new Thread(() -> {
time.sleep(1);
int counter = 0;
long start1 = time.nanoseconds();
for (int i = 0; i < iters; i++) {
synchronized (lock) {
counter++;
}
}
System.out.println("synchronized: " + ((time.nanoseconds() - start1) / iters));
System.out.println(counter);
done.set(true);
});
Thread t2 = new Thread(() -> {
int counter = 0;
while (!done.get()) {
time.sleep(1);
synchronized (lock) {
counter += 1;
}
}
System.out.println("Counter: " + counter);
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Testing locks");
done.set(false);
final ReentrantLock lock2 = new ReentrantLock();
Thread t3 = new Thread(() -> {
time.sleep(1);
int counter = 0;
long start12 = time.nanoseconds();
for (int i = 0; i < iters; i++) {
lock2.lock();
counter++;
lock2.unlock();
}
System.out.println("lock: " + ((time.nanoseconds() - start12) / iters));
System.out.println(counter);
done.set(true);
});
Thread t4 = new Thread(() -> {
int counter = 0;
while (!done.get()) {
time.sleep(1);
lock2.lock();
counter++;
lock2.unlock();
}
System.out.println("Counter: " + counter);
});
t3.start();
t4.start();
t3.join();
t4.join();
Map<String, Integer> values = new HashMap<>();
for (int i = 0; i < 100; i++)
values.put(Integer.toString(i), i);
System.out.println("HashMap:");
benchMap(2, 1000000, values);
System.out.println("ConcurrentHashMap:");
benchMap(2, 1000000, new ConcurrentHashMap<>(values));
System.out.println("CopyOnWriteMap:");
benchMap(2, 1000000, new CopyOnWriteMap<>(values));
}
private static void benchMap(int numThreads, final int iters, final Map<String, Integer> map) throws Exception {
final List<String> keys = new ArrayList<>(map.keySet());
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread(() -> {
long start = System.nanoTime();
for (int j = 0; j < iters; j++)
map.get(keys.get(j % threads.size()));
System.out.println("Map access time: " + ((System.nanoTime() - start) / (double) iters));
}));
}
for (Thread thread : threads)
thread.start();
for (Thread thread : threads)
thread.join();
}
private static long systemMillis(int iters) {
long total = 0;
for (int i = 0; i < iters; i++)
total += System.currentTimeMillis();
return total;
}
private static long systemNanos(int iters) {
long total = 0;
for (int i = 0; i < iters; i++)
total += System.currentTimeMillis();
return total;
}
}

View File

@ -0,0 +1,192 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.util;
import org.apache.kafka.common.utils.CopyOnWriteMap;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* This benchmark compares the performance of CopyOnWriteMap and ConcurrentHashMap
* under a low-write scenario. We use computeIfAbsent as the write operation,
* since it is the only CopyOnWriteMap write operation used in the code base and
* constitutes 10% of the operations. The benchmark tests the performance of
* get, values, and entrySet methods, as these methods are also used in the code.
*/
@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 3)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@Threads(2)
public class ConcurrentMapBenchmark {
private static final int TIMES = 100_000;
@Param({"100"})
private int mapSize;
@Param({"0.1"})
private double writePercentage;
private Map<Integer, Integer> concurrentHashMap;
private Map<Integer, Integer> copyOnWriteMap;
private int writePerLoops;
@Setup(Level.Invocation)
public void setup() {
Map<Integer, Integer> mapTemplate = IntStream.range(0, mapSize).boxed()
.collect(Collectors.toMap(i -> i, i -> i));
concurrentHashMap = new ConcurrentHashMap<>(mapTemplate);
copyOnWriteMap = new CopyOnWriteMap<>(mapTemplate);
writePerLoops = TIMES / (int) Math.round(writePercentage * TIMES);
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapGet(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
} else {
blackhole.consume(concurrentHashMap.get(i % mapSize));
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapGetRandom(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
} else {
blackhole.consume(concurrentHashMap.get(ThreadLocalRandom.current().nextInt(0, mapSize + 1)));
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testCopyOnWriteMapGet(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
} else {
blackhole.consume(copyOnWriteMap.get(i % mapSize));
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testCopyOnWriteMapGetRandom(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
} else {
blackhole.consume(copyOnWriteMap.get(ThreadLocalRandom.current().nextInt(0, mapSize + 1)));
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapValues(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
} else {
for (int value : concurrentHashMap.values()) {
blackhole.consume(value);
}
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testCopyOnWriteMapValues(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
} else {
for (int value : copyOnWriteMap.values()) {
blackhole.consume(value);
}
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testConcurrentHashMapEntrySet(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
concurrentHashMap.computeIfAbsent(i + mapSize, key -> key);
} else {
for (Map.Entry<Integer, Integer> entry : concurrentHashMap.entrySet()) {
blackhole.consume(entry);
}
}
}
}
@Benchmark
@OperationsPerInvocation(TIMES)
public void testCopyOnWriteMapEntrySet(Blackhole blackhole) {
for (int i = 0; i < TIMES; i++) {
if (i % writePerLoops == 0) {
// add offset mapSize to ensure computeIfAbsent do add new entry
copyOnWriteMap.computeIfAbsent(i + mapSize, key -> key);
} else {
for (Map.Entry<Integer, Integer> entry : copyOnWriteMap.entrySet()) {
blackhole.consume(entry);
}
}
}
}
}