From a056672f7cb1f13aac62a20c9ce46a632029c19f Mon Sep 17 00:00:00 2001 From: Kamal Chandraprakash Date: Wed, 20 Aug 2025 09:48:57 +0530 Subject: [PATCH 1/2] KAFKA-19599: Reduce the frequency of ReplicaNotAvailableException thrown to clients when RLMM is not ready (#20345) During broker restarts, the topic-based RemoteLogMetadataManager (RLMM) constructs the state by reading the internal `__remote_log_metadata` topic. When the partition is not ready to perform remote storage operations, then ReplicaNotAvailableException thrown back to the consumer. The clients retries the request immediately. This results in a lot of FETCH requests on the broker and utilizes the request handler threads. Using the CountdownLatch to reduce the frequency of ReplicaNotAvailableException thrown back to the clients. This will improve the request handler thread usage on the broker. Previously for one consumer, when RLMM is not ready for a partition, then ~9K FetchConsumer requests / sec are received on the broker. With this patch, the number of FETCH requests reduced by 95% to 600 / sec. Reviewers: Lan Ding , Satish Duggana --- .../storage/RemoteLogMetadataCache.java | 5 +++++ .../storage/RemotePartitionMetadataStore.java | 12 +++++++++-- .../storage/RemoteLogMetadataCacheTest.java | 21 +++++++++++++++++++ 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java index ef9f6f62a6e..4316f7345be 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments @@ -116,6 +117,10 @@ public class RemoteLogMetadataCache { return initializedLatch.getCount() == 0; } + boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initializedLatch.await(timeout, unit); + } + /** * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}. diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index 9c7568a4927..155b3a9c172 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * This class represents a store to maintain the {@link RemotePartitionDeleteMetadata} and {@link RemoteLogMetadataCache} for each topic partition. @@ -123,8 +124,15 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } if (!remoteLogMetadataCache.isInitialized()) { - // Throwing a retriable ReplicaNotAvailableException here for clients retry. - throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); + try { + boolean initialized = remoteLogMetadataCache.awaitInitialized(100, TimeUnit.MILLISECONDS); + if (!initialized) { + // Throwing a retriable ReplicaNotAvailableException here for clients retry. + throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); + } + } catch (InterruptedException ex) { + throw new RemoteResourceNotFoundException("Couldn't initialize remote log metadata cache for partition: " + topicIdPartition); + } } return remoteLogMetadataCache; } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java index 4cc7d90198d..6eabbcc8d11 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java @@ -35,8 +35,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -146,6 +149,24 @@ public class RemoteLogMetadataCacheTest { updateAndVerifyCacheContents(updatedMetadata, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, leaderEpoch); } + @Test + public void testAwaitInitialized() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + assertFalse(cache.isInitialized()); + Thread t = new Thread(() -> { + try { + cache.awaitInitialized(2000, TimeUnit.MILLISECONDS); + latch.countDown(); + } catch (InterruptedException e) { + fail("Shouldn't throw InterruptedException"); + } + }); + t.start(); + cache.markInitialized(); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertTrue(cache.isInitialized()); + } + private void updateAndVerifyCacheContents(RemoteLogSegmentMetadataUpdate updatedMetadata, RemoteLogSegmentState expectedSegmentState, int leaderEpoch) throws RemoteResourceNotFoundException { From 0202721b4c175c606c3d079e602fe33e8faea018 Mon Sep 17 00:00:00 2001 From: Sanskar Jhajharia Date: Wed, 20 Aug 2025 19:50:54 +0530 Subject: [PATCH 2/2] MINOR: Cleanups in jmh-benchmarks module (#20374) This PR aims at cleaning up the `jmh-benchmarks` module further by getting rid of some extra code which can be replaced by record Reviewers: Ken Huang , Chia-Ping Tsai --- .../StreamsStickyAssignorBenchmark.java | 5 ++-- .../jmh/core/TestPurgatoryPerformance.java | 30 ++----------------- 2 files changed, 4 insertions(+), 31 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java index 3293c0223f2..22863fef5fa 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java @@ -44,7 +44,6 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,7 +106,7 @@ public class StreamsStickyAssignorBenchmark { taskAssignor = new StickyTaskAssignor(); Map members = createMembers(); - this.assignmentConfigs = Collections.singletonMap( + this.assignmentConfigs = Map.of( "num.standby.replicas", Integer.toString(standbyReplicas) ); @@ -138,7 +137,7 @@ public class StreamsStickyAssignorBenchmark { for (Map.Entry member : groupSpec.members().entrySet()) { MemberAssignment memberAssignment = members.getOrDefault( member.getKey(), - new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + new MemberAssignment(Map.of(), Map.of(), Map.of()) ); updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec( diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java index 68f3a52918c..e0fd3d8c8d4 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java @@ -30,7 +30,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -405,12 +404,7 @@ public class TestPurgatoryPerformance { } - private static class Scheduled implements Delayed { - final FakeOperation operation; - - public Scheduled(FakeOperation operation) { - this.operation = operation; - } + private record Scheduled(FakeOperation operation) implements Delayed { @Override public long getDelay(TimeUnit unit) { @@ -429,30 +423,11 @@ public class TestPurgatoryPerformance { } } - private static class FakeOperationKey implements DelayedOperationKey { - private final String key; - - public FakeOperationKey(String key) { - this.key = key; - } - + private record FakeOperationKey(String key) implements DelayedOperationKey { @Override public String keyLabel() { return key; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FakeOperationKey that = (FakeOperationKey) o; - return Objects.equals(key, that.key); - } - - @Override - public int hashCode() { - return Objects.hash(key); - } } private static class FakeOperation extends DelayedOperation { @@ -469,7 +444,6 @@ public class TestPurgatoryPerformance { @Override public void onExpiration() { - } @Override