MINOR: Fix a race and add JMH bench for HdrHistogram (#17221)

This commit is contained in:
Dimitar Dimitrov 2024-09-27 09:49:10 -05:00 committed by GitHub
parent e1deeb4b91
commit bc47ce1a53
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 179 additions and 13 deletions

View File

@ -3196,6 +3196,7 @@ project(':jmh-benchmarks') {
implementation project(':server')
implementation project(':raft')
implementation project(':clients')
implementation project(':coordinator-common')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation project(':metadata')

View File

@ -29,6 +29,7 @@
<allow pkg="java.security"/>
<allow pkg="javax.net.ssl"/>
<allow pkg="javax.security"/>
<allow pkg="com.yammer.metrics.core"/>
<allow pkg="org.apache.kafka.common"/>
<allow pkg="org.apache.kafka.clients.producer"/>
<allow pkg="kafka.cluster"/>
@ -51,6 +52,7 @@
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage"/>
<allow pkg="org.apache.kafka.clients"/>
<allow class="org.apache.kafka.coordinator.common.runtime.HdrHistogram"/>
<allow pkg="org.apache.kafka.coordinator.group"/>
<allow pkg="org.apache.kafka.image"/>
<allow pkg="org.apache.kafka.metadata"/>

View File

@ -20,8 +20,6 @@ import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.HdrHistogram.ValueRecorder;
import java.util.concurrent.atomic.AtomicReference;
/**
* <p>A wrapper on top of the HdrHistogram API. It handles writing to the histogram by delegating
* to an internal {@link ValueRecorder} implementation, and reading from the histogram by
@ -35,6 +33,7 @@ public final class HdrHistogram {
private static final long DEFAULT_MAX_SNAPSHOT_AGE_MS = 1000L;
private final Object lock = new Object();
/**
* The duration (in millis) after which the latest histogram snapshot is considered outdated and
* subsequent calls to {@link #latestHistogram(long)} will result in the snapshot being recreated.
@ -54,7 +53,7 @@ public final class HdrHistogram {
* The latest snapshot of the internal HdrHistogram. Automatically updated by
* {@link #latestHistogram(long)} if older than {@link #maxSnapshotAgeMs}.
*/
private final AtomicReference<Timestamped<Histogram>> timestampedHistogramSnapshot;
private volatile Timestamped<Histogram> timestampedHistogramSnapshot;
public HdrHistogram(
long highestTrackableValue,
@ -70,19 +69,20 @@ public final class HdrHistogram {
) {
this.maxSnapshotAgeMs = maxSnapshotAgeMs;
recorder = new Recorder(highestTrackableValue, numberOfSignificantValueDigits);
this.timestampedHistogramSnapshot = new AtomicReference<>(new Timestamped<>(0, null));
this.timestampedHistogramSnapshot = new Timestamped<>(0, null);
}
private Histogram latestHistogram(long now) {
Timestamped<Histogram> latest = timestampedHistogramSnapshot.get();
while (now - latest.timestamp > maxSnapshotAgeMs) {
Histogram currentSnapshot = recorder.getIntervalHistogram();
boolean updatedLatest = timestampedHistogramSnapshot.compareAndSet(
latest, new Timestamped<>(now, currentSnapshot));
latest = timestampedHistogramSnapshot.get();
if (updatedLatest) {
break;
Timestamped<Histogram> latest = timestampedHistogramSnapshot;
if (now - latest.timestamp > maxSnapshotAgeMs) {
// Double-checked locking ensures that the thread that extracts the histogram data is
// the one that updates the internal snapshot.
synchronized (lock) {
latest = timestampedHistogramSnapshot;
if (now - latest.timestamp > maxSnapshotAgeMs) {
latest = new Timestamped<>(now, recorder.getIntervalHistogram());
timestampedHistogramSnapshot = latest;
}
}
}
return latest.value;

View File

@ -16,6 +16,8 @@
*/
package org.apache.kafka.coordinator.common.runtime;
import org.apache.kafka.common.utils.ThreadUtils;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
@ -25,8 +27,12 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@ -172,4 +178,39 @@ public class HdrHistogramTest {
assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + maxSnapshotAgeMs));
assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + maxSnapshotAgeMs));
}
@Test
public void testLatestHistogramRace() throws InterruptedException, ExecutionException {
long maxSnapshotAgeMs = 10L;
long now = System.currentTimeMillis();
HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, MAX_VALUE, 1);
ExecutorService countExecutor = Executors.newFixedThreadPool(2);
for (int i = 1; i < 10000; i++) {
int numEvents = 2;
for (int j = 0; j < numEvents; j++) {
hdrHistogram.record(i);
}
final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
now = moreThanMaxAge;
CountDownLatch latch = new CountDownLatch(1);
Callable<Long> countTask = () -> {
try {
assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
return hdrHistogram.count(moreThanMaxAge);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
Future<Long> t1Future = countExecutor.submit(countTask);
Future<Long> t2Future = countExecutor.submit(countTask);
latch.countDown();
long t1Count = t1Future.get();
long t2Count = t2Future.get();
assertTrue(
numEvents == t1Count && numEvents == t2Count,
String.format("Expected %d events in both threads, got %d in T1 and %d in T2",
numEvents, t1Count, t2Count));
}
ThreadUtils.shutdownExecutorServiceQuietly(countExecutor, 500, TimeUnit.MILLISECONDS);
}
}

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.jmh.metrics;
import org.apache.kafka.coordinator.common.runtime.HdrHistogram;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Group;
import org.openjdk.jmh.annotations.GroupThreads;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@State(Scope.Benchmark)
@Fork(value = 1)
@Threads(10)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public class HistogramBenchmark {
/*
* This benchmark compares the performance of the most commonly used in the Kafka codebase
* Yammer histogram and the new HdrHistogram. It does it by focusing on the write path in a
* multiple writers, multiple readers scenario.
*
* The benchmark relies on JMH Groups which allows us to distribute the number of worker threads
* to the different benchmark methods.
*/
private static final long MAX_VALUE = TimeUnit.MINUTES.toMillis(1L);
private Histogram yammerHistogram;
private HdrHistogram hdrHistogram;
@Setup(Level.Trial)
public void setUp() {
yammerHistogram = KafkaYammerMetrics.defaultRegistry().newHistogram(new MetricName("a", "", ""), true);
hdrHistogram = new HdrHistogram(MAX_VALUE, 3);
}
/*
* The write benchmark methods below are the core of the benchmark. They use ThreadLocalRandom
* to generate values to record. This is much faster than the actual histogram recording, so
* the benchmark results are representative of the histogram implementation.
*/
@Benchmark
@Group("runner")
@GroupThreads(3)
public void writeYammerHistogram() {
yammerHistogram.update(ThreadLocalRandom.current().nextLong(MAX_VALUE));
}
@Benchmark
@Group("runner")
@GroupThreads(3)
public void writeHdrHistogram() {
hdrHistogram.record(ThreadLocalRandom.current().nextLong(MAX_VALUE));
}
/*
* The read benchmark methods below are not real benchmark methods!
* They are there only to simulate the concurrent exercise of the read and the write paths in
* the histogram implementations (with the read path exercised significantly less often). The
* measurements for these benchmark methods should be ignored as although not optimized away
* (that's why the methods have a return value), they practically measure the cost of a
* System.currentTimeMillis() call.
*/
@Benchmark
@Group("runner")
@GroupThreads(2)
public double readYammerHistogram() {
long now = System.currentTimeMillis();
if (now % 199 == 0) {
return yammerHistogram.getSnapshot().get999thPercentile();
}
return now;
}
@Benchmark
@Group("runner")
@GroupThreads(2)
public double readHdrHistogram() {
long now = System.currentTimeMillis();
if (now % 199 == 0) {
return hdrHistogram.measurePercentile(now, 99.9);
}
return now;
}
}