diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java index 3293c0223f2..22863fef5fa 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/StreamsStickyAssignorBenchmark.java @@ -44,7 +44,6 @@ import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -107,7 +106,7 @@ public class StreamsStickyAssignorBenchmark { taskAssignor = new StickyTaskAssignor(); Map members = createMembers(); - this.assignmentConfigs = Collections.singletonMap( + this.assignmentConfigs = Map.of( "num.standby.replicas", Integer.toString(standbyReplicas) ); @@ -138,7 +137,7 @@ public class StreamsStickyAssignorBenchmark { for (Map.Entry member : groupSpec.members().entrySet()) { MemberAssignment memberAssignment = members.getOrDefault( member.getKey(), - new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()) + new MemberAssignment(Map.of(), Map.of(), Map.of()) ); updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec( diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java index 68f3a52918c..e0fd3d8c8d4 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/core/TestPurgatoryPerformance.java @@ -30,7 +30,6 @@ import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.Objects; import java.util.Optional; import java.util.Random; import java.util.concurrent.CountDownLatch; @@ -405,12 +404,7 @@ public class TestPurgatoryPerformance { } - private static class Scheduled implements Delayed { - final FakeOperation operation; - - public Scheduled(FakeOperation operation) { - this.operation = operation; - } + private record Scheduled(FakeOperation operation) implements Delayed { @Override public long getDelay(TimeUnit unit) { @@ -429,30 +423,11 @@ public class TestPurgatoryPerformance { } } - private static class FakeOperationKey implements DelayedOperationKey { - private final String key; - - public FakeOperationKey(String key) { - this.key = key; - } - + private record FakeOperationKey(String key) implements DelayedOperationKey { @Override public String keyLabel() { return key; } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - FakeOperationKey that = (FakeOperationKey) o; - return Objects.equals(key, that.key); - } - - @Override - public int hashCode() { - return Objects.hash(key); - } } private static class FakeOperation extends DelayedOperation { @@ -469,7 +444,6 @@ public class TestPurgatoryPerformance { @Override public void onExpiration() { - } @Override diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java index ef9f6f62a6e..4316f7345be 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java @@ -34,6 +34,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; /** * This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments @@ -116,6 +117,10 @@ public class RemoteLogMetadataCache { return initializedLatch.getCount() == 0; } + boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException { + return initializedLatch.await(timeout, unit); + } + /** * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with * {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}. diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java index 9c7568a4927..155b3a9c172 100644 --- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java +++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java @@ -36,6 +36,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; /** * This class represents a store to maintain the {@link RemotePartitionDeleteMetadata} and {@link RemoteLogMetadataCache} for each topic partition. @@ -123,8 +124,15 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition); } if (!remoteLogMetadataCache.isInitialized()) { - // Throwing a retriable ReplicaNotAvailableException here for clients retry. - throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); + try { + boolean initialized = remoteLogMetadataCache.awaitInitialized(100, TimeUnit.MILLISECONDS); + if (!initialized) { + // Throwing a retriable ReplicaNotAvailableException here for clients retry. + throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition); + } + } catch (InterruptedException ex) { + throw new RemoteResourceNotFoundException("Couldn't initialize remote log metadata cache for partition: " + topicIdPartition); + } } return remoteLogMetadataCache; } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java index 4cc7d90198d..6eabbcc8d11 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCacheTest.java @@ -35,8 +35,11 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -146,6 +149,24 @@ public class RemoteLogMetadataCacheTest { updateAndVerifyCacheContents(updatedMetadata, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, leaderEpoch); } + @Test + public void testAwaitInitialized() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + assertFalse(cache.isInitialized()); + Thread t = new Thread(() -> { + try { + cache.awaitInitialized(2000, TimeUnit.MILLISECONDS); + latch.countDown(); + } catch (InterruptedException e) { + fail("Shouldn't throw InterruptedException"); + } + }); + t.start(); + cache.markInitialized(); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertTrue(cache.isInitialized()); + } + private void updateAndVerifyCacheContents(RemoteLogSegmentMetadataUpdate updatedMetadata, RemoteLogSegmentState expectedSegmentState, int leaderEpoch) throws RemoteResourceNotFoundException {