From bc47ce1a53e693709e0005517c82f33c6880afe2 Mon Sep 17 00:00:00 2001 From: Dimitar Dimitrov <30328539+dimitarndimitrov@users.noreply.github.com> Date: Fri, 27 Sep 2024 09:49:10 -0500 Subject: [PATCH] MINOR: Fix a race and add JMH bench for HdrHistogram (#17221) --- build.gradle | 1 + checkstyle/import-control-jmh-benchmarks.xml | 2 + .../common/runtime/HdrHistogram.java | 26 ++-- .../common/runtime/HdrHistogramTest.java | 41 ++++++ .../kafka/jmh/metrics/HistogramBenchmark.java | 122 ++++++++++++++++++ 5 files changed, 179 insertions(+), 13 deletions(-) create mode 100644 jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java diff --git a/build.gradle b/build.gradle index 64b9d488365..0907f0506a4 100644 --- a/build.gradle +++ b/build.gradle @@ -3196,6 +3196,7 @@ project(':jmh-benchmarks') { implementation project(':server') implementation project(':raft') implementation project(':clients') + implementation project(':coordinator-common') implementation project(':group-coordinator') implementation project(':group-coordinator:group-coordinator-api') implementation project(':metadata') diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 1612477ea00..65bfbb63373 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -29,6 +29,7 @@ + @@ -51,6 +52,7 @@ + diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java index 454a7aeb3b3..4b961d957cc 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java @@ -20,8 +20,6 @@ import org.HdrHistogram.Histogram; import org.HdrHistogram.Recorder; import org.HdrHistogram.ValueRecorder; -import java.util.concurrent.atomic.AtomicReference; - /** *

A wrapper on top of the HdrHistogram API. It handles writing to the histogram by delegating * to an internal {@link ValueRecorder} implementation, and reading from the histogram by @@ -35,6 +33,7 @@ public final class HdrHistogram { private static final long DEFAULT_MAX_SNAPSHOT_AGE_MS = 1000L; + private final Object lock = new Object(); /** * The duration (in millis) after which the latest histogram snapshot is considered outdated and * subsequent calls to {@link #latestHistogram(long)} will result in the snapshot being recreated. @@ -54,7 +53,7 @@ public final class HdrHistogram { * The latest snapshot of the internal HdrHistogram. Automatically updated by * {@link #latestHistogram(long)} if older than {@link #maxSnapshotAgeMs}. */ - private final AtomicReference> timestampedHistogramSnapshot; + private volatile Timestamped timestampedHistogramSnapshot; public HdrHistogram( long highestTrackableValue, @@ -70,19 +69,20 @@ public final class HdrHistogram { ) { this.maxSnapshotAgeMs = maxSnapshotAgeMs; recorder = new Recorder(highestTrackableValue, numberOfSignificantValueDigits); - this.timestampedHistogramSnapshot = new AtomicReference<>(new Timestamped<>(0, null)); + this.timestampedHistogramSnapshot = new Timestamped<>(0, null); } private Histogram latestHistogram(long now) { - Timestamped latest = timestampedHistogramSnapshot.get(); - while (now - latest.timestamp > maxSnapshotAgeMs) { - Histogram currentSnapshot = recorder.getIntervalHistogram(); - boolean updatedLatest = timestampedHistogramSnapshot.compareAndSet( - latest, new Timestamped<>(now, currentSnapshot)); - - latest = timestampedHistogramSnapshot.get(); - if (updatedLatest) { - break; + Timestamped latest = timestampedHistogramSnapshot; + if (now - latest.timestamp > maxSnapshotAgeMs) { + // Double-checked locking ensures that the thread that extracts the histogram data is + // the one that updates the internal snapshot. + synchronized (lock) { + latest = timestampedHistogramSnapshot; + if (now - latest.timestamp > maxSnapshotAgeMs) { + latest = new Timestamped<>(now, recorder.getIntervalHistogram()); + timestampedHistogramSnapshot = latest; + } } } return latest.value; diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java index ad4203310d8..7703f11c81c 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.coordinator.common.runtime; +import org.apache.kafka.common.utils.ThreadUtils; + import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.MetricsRegistry; @@ -25,8 +27,12 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -172,4 +178,39 @@ public class HdrHistogramTest { assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + maxSnapshotAgeMs)); assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + maxSnapshotAgeMs)); } + + @Test + public void testLatestHistogramRace() throws InterruptedException, ExecutionException { + long maxSnapshotAgeMs = 10L; + long now = System.currentTimeMillis(); + HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, MAX_VALUE, 1); + ExecutorService countExecutor = Executors.newFixedThreadPool(2); + for (int i = 1; i < 10000; i++) { + int numEvents = 2; + for (int j = 0; j < numEvents; j++) { + hdrHistogram.record(i); + } + final long moreThanMaxAge = now + maxSnapshotAgeMs + 1; + now = moreThanMaxAge; + CountDownLatch latch = new CountDownLatch(1); + Callable countTask = () -> { + try { + assertTrue(latch.await(500, TimeUnit.MILLISECONDS)); + return hdrHistogram.count(moreThanMaxAge); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }; + Future t1Future = countExecutor.submit(countTask); + Future t2Future = countExecutor.submit(countTask); + latch.countDown(); + long t1Count = t1Future.get(); + long t2Count = t2Future.get(); + assertTrue( + numEvents == t1Count && numEvents == t2Count, + String.format("Expected %d events in both threads, got %d in T1 and %d in T2", + numEvents, t1Count, t2Count)); + } + ThreadUtils.shutdownExecutorServiceQuietly(countExecutor, 500, TimeUnit.MILLISECONDS); + } } diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java new file mode 100644 index 00000000000..d5f65eaf134 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.jmh.metrics; + +import org.apache.kafka.coordinator.common.runtime.HdrHistogram; +import org.apache.kafka.server.metrics.KafkaYammerMetrics; + +import com.yammer.metrics.core.Histogram; +import com.yammer.metrics.core.MetricName; + +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Group; +import org.openjdk.jmh.annotations.GroupThreads; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Threads(10) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class HistogramBenchmark { + + /* + * This benchmark compares the performance of the most commonly used in the Kafka codebase + * Yammer histogram and the new HdrHistogram. It does it by focusing on the write path in a + * multiple writers, multiple readers scenario. + * + * The benchmark relies on JMH Groups which allows us to distribute the number of worker threads + * to the different benchmark methods. + */ + + private static final long MAX_VALUE = TimeUnit.MINUTES.toMillis(1L); + + private Histogram yammerHistogram; + private HdrHistogram hdrHistogram; + + @Setup(Level.Trial) + public void setUp() { + yammerHistogram = KafkaYammerMetrics.defaultRegistry().newHistogram(new MetricName("a", "", ""), true); + hdrHistogram = new HdrHistogram(MAX_VALUE, 3); + } + + /* + * The write benchmark methods below are the core of the benchmark. They use ThreadLocalRandom + * to generate values to record. This is much faster than the actual histogram recording, so + * the benchmark results are representative of the histogram implementation. + */ + + @Benchmark + @Group("runner") + @GroupThreads(3) + public void writeYammerHistogram() { + yammerHistogram.update(ThreadLocalRandom.current().nextLong(MAX_VALUE)); + } + + @Benchmark + @Group("runner") + @GroupThreads(3) + public void writeHdrHistogram() { + hdrHistogram.record(ThreadLocalRandom.current().nextLong(MAX_VALUE)); + } + + /* + * The read benchmark methods below are not real benchmark methods! + * They are there only to simulate the concurrent exercise of the read and the write paths in + * the histogram implementations (with the read path exercised significantly less often). The + * measurements for these benchmark methods should be ignored as although not optimized away + * (that's why the methods have a return value), they practically measure the cost of a + * System.currentTimeMillis() call. + */ + + @Benchmark + @Group("runner") + @GroupThreads(2) + public double readYammerHistogram() { + long now = System.currentTimeMillis(); + if (now % 199 == 0) { + return yammerHistogram.getSnapshot().get999thPercentile(); + } + return now; + } + + @Benchmark + @Group("runner") + @GroupThreads(2) + public double readHdrHistogram() { + long now = System.currentTimeMillis(); + if (now % 199 == 0) { + return hdrHistogram.measurePercentile(now, 99.9); + } + return now; + } +}