mirror of https://github.com/apache/kafka.git
Merge branch 'apache:trunk' into KAFKA-17103
This commit is contained in:
commit
f9703f7e8b
|
@ -44,7 +44,6 @@ import org.openjdk.jmh.annotations.Threads;
|
|||
import org.openjdk.jmh.annotations.Warmup;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -107,7 +106,7 @@ public class StreamsStickyAssignorBenchmark {
|
|||
taskAssignor = new StickyTaskAssignor();
|
||||
|
||||
Map<String, StreamsGroupMember> members = createMembers();
|
||||
this.assignmentConfigs = Collections.singletonMap(
|
||||
this.assignmentConfigs = Map.of(
|
||||
"num.standby.replicas",
|
||||
Integer.toString(standbyReplicas)
|
||||
);
|
||||
|
@ -138,7 +137,7 @@ public class StreamsStickyAssignorBenchmark {
|
|||
for (Map.Entry<String, AssignmentMemberSpec> member : groupSpec.members().entrySet()) {
|
||||
MemberAssignment memberAssignment = members.getOrDefault(
|
||||
member.getKey(),
|
||||
new MemberAssignment(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap())
|
||||
new MemberAssignment(Map.of(), Map.of(), Map.of())
|
||||
);
|
||||
|
||||
updatedMemberSpec.put(member.getKey(), new AssignmentMemberSpec(
|
||||
|
|
|
@ -30,7 +30,6 @@ import java.lang.reflect.InvocationTargetException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
@ -405,12 +404,7 @@ public class TestPurgatoryPerformance {
|
|||
|
||||
}
|
||||
|
||||
private static class Scheduled implements Delayed {
|
||||
final FakeOperation operation;
|
||||
|
||||
public Scheduled(FakeOperation operation) {
|
||||
this.operation = operation;
|
||||
}
|
||||
private record Scheduled(FakeOperation operation) implements Delayed {
|
||||
|
||||
@Override
|
||||
public long getDelay(TimeUnit unit) {
|
||||
|
@ -429,30 +423,11 @@ public class TestPurgatoryPerformance {
|
|||
}
|
||||
}
|
||||
|
||||
private static class FakeOperationKey implements DelayedOperationKey {
|
||||
private final String key;
|
||||
|
||||
public FakeOperationKey(String key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
private record FakeOperationKey(String key) implements DelayedOperationKey {
|
||||
@Override
|
||||
public String keyLabel() {
|
||||
return key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
FakeOperationKey that = (FakeOperationKey) o;
|
||||
return Objects.equals(key, that.key);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(key);
|
||||
}
|
||||
}
|
||||
|
||||
private static class FakeOperation extends DelayedOperation {
|
||||
|
@ -469,7 +444,6 @@ public class TestPurgatoryPerformance {
|
|||
|
||||
@Override
|
||||
public void onExpiration() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Optional;
|
|||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* This class provides an in-memory cache of remote log segment metadata. This maintains the lineage of segments
|
||||
|
@ -116,6 +117,10 @@ public class RemoteLogMetadataCache {
|
|||
return initializedLatch.getCount() == 0;
|
||||
}
|
||||
|
||||
boolean awaitInitialized(long timeout, TimeUnit unit) throws InterruptedException {
|
||||
return initializedLatch.await(timeout, unit);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with
|
||||
* {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED} state, else returns {@link Optional#empty()}.
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.util.Map;
|
|||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* This class represents a store to maintain the {@link RemotePartitionDeleteMetadata} and {@link RemoteLogMetadataCache} for each topic partition.
|
||||
|
@ -123,8 +124,15 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
|
|||
throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
|
||||
}
|
||||
if (!remoteLogMetadataCache.isInitialized()) {
|
||||
// Throwing a retriable ReplicaNotAvailableException here for clients retry.
|
||||
throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition);
|
||||
try {
|
||||
boolean initialized = remoteLogMetadataCache.awaitInitialized(100, TimeUnit.MILLISECONDS);
|
||||
if (!initialized) {
|
||||
// Throwing a retriable ReplicaNotAvailableException here for clients retry.
|
||||
throw new ReplicaNotAvailableException("Remote log metadata cache is not initialized for partition: " + topicIdPartition);
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
throw new RemoteResourceNotFoundException("Couldn't initialize remote log metadata cache for partition: " + topicIdPartition);
|
||||
}
|
||||
}
|
||||
return remoteLogMetadataCache;
|
||||
}
|
||||
|
|
|
@ -35,8 +35,11 @@ import java.util.ArrayList;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
@ -146,6 +149,24 @@ public class RemoteLogMetadataCacheTest {
|
|||
updateAndVerifyCacheContents(updatedMetadata, RemoteLogSegmentState.DELETE_SEGMENT_FINISHED, leaderEpoch);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAwaitInitialized() throws InterruptedException {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
assertFalse(cache.isInitialized());
|
||||
Thread t = new Thread(() -> {
|
||||
try {
|
||||
cache.awaitInitialized(2000, TimeUnit.MILLISECONDS);
|
||||
latch.countDown();
|
||||
} catch (InterruptedException e) {
|
||||
fail("Shouldn't throw InterruptedException");
|
||||
}
|
||||
});
|
||||
t.start();
|
||||
cache.markInitialized();
|
||||
assertTrue(latch.await(1, TimeUnit.SECONDS));
|
||||
assertTrue(cache.isInitialized());
|
||||
}
|
||||
|
||||
private void updateAndVerifyCacheContents(RemoteLogSegmentMetadataUpdate updatedMetadata,
|
||||
RemoteLogSegmentState expectedSegmentState,
|
||||
int leaderEpoch) throws RemoteResourceNotFoundException {
|
||||
|
|
Loading…
Reference in New Issue