KAFKA-16785 Migrate TopicBasedRemoteLogMetadataManagerRestartTest to new test infra (#16170)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Kuan-Po (Cooper) Tseng 2024-06-02 22:18:53 +08:00 committed by GitHub
parent 9eb05fc729
commit b05f82d444
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 60 additions and 102 deletions

View File

@ -53,12 +53,6 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier);
}
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner) {
initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new);
}
public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
boolean startConsumerThread,
Function<Integer, RemoteLogMetadataTopicPartitioner> remoteLogMetadataTopicPartitioner,
@ -70,6 +64,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
.startConsumerThread(startConsumerThread)
.remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner)
.remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier)
.overrideRemoteLogMetadataManagerProps(overrideRemoteLogMetadataManagerProps())
.build();
}

View File

@ -16,6 +16,11 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
import kafka.test.ClusterInstance;
import kafka.test.annotation.ClusterTest;
import kafka.test.junit.ClusterTestExtensions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
@ -24,139 +29,97 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
@ExtendWith(value = ClusterTestExtensions.class)
@Tag("integration")
public class TopicBasedRemoteLogMetadataManagerRestartTest {
private static final int SEG_SIZE = 1024 * 1024;
private final Time time = new MockTime(1);
private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
private final ClusterInstance clusterInstance;
private TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness;
@BeforeEach
public void setup() {
// Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness() {
protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
Map<String, Object> props = new HashMap<>();
props.put(LOG_DIR, logDir);
return props;
}
};
remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
TopicBasedRemoteLogMetadataManagerRestartTest(ClusterInstance clusterInstance) { // Constructor injections
this.clusterInstance = clusterInstance;
}
private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, RemoteLogMetadataTopicPartitioner::new);
private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() {
return RemoteLogMetadataManagerTestUtils.builder()
.topicIdPartitions(Collections.emptySet())
.bootstrapServers(clusterInstance.bootstrapServers())
.startConsumerThread(true)
.remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new)
.overrideRemoteLogMetadataManagerProps(Collections.singletonMap(LOG_DIR, logDir))
.build();
}
@AfterEach
public void teardown() throws IOException {
if (remoteLogMetadataManagerHarness != null) {
remoteLogMetadataManagerHarness.close();
}
}
private void stopTopicBasedRemoteLogMetadataManagerHarness() {
remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager();
}
private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
}
@Test
@ClusterTest(brokers = 3)
public void testRLMMAPIsAfterRestart() throws Exception {
// Create topics.
String leaderTopic = "new-leader";
HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>();
List<Object> leaderTopicReplicas = new ArrayList<>();
// Set broker id 0 as the first entry which is taken as the leader.
leaderTopicReplicas.add(0);
leaderTopicReplicas.add(1);
leaderTopicReplicas.add(2);
assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(
leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
String followerTopic = "new-follower";
HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
List<Object> followerTopicReplicas = new ArrayList<>();
// Set broker id 1 as the first entry which is taken as the leader.
followerTopicReplicas.add(1);
followerTopicReplicas.add(2);
followerTopicReplicas.add(0);
assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
remoteLogMetadataManagerHarness.createTopicWithAssignment(followerTopic,
JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas),
remoteLogMetadataManagerHarness.listenerName());
try (Admin admin = clusterInstance.createAdminClient()) {
// Set broker id 0 as the first entry which is taken as the leader.
NewTopic newLeaderTopic = new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2)));
// Set broker id 1 as the first entry which is taken as the leader.
NewTopic newFollowerTopic = new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0)));
admin.createTopics(Arrays.asList(newLeaderTopic, newFollowerTopic));
}
final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
// Register these partitions to RLMM.
topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
// Add segments for these partitions, but they are not available as they have not yet been subscribed.
RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get();
// Stop TopicBasedRemoteLogMetadataManager only.
stopTopicBasedRemoteLogMetadataManagerHarness();
try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager.
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(
Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
// Start TopicBasedRemoteLogMetadataManager
startTopicBasedRemoteLogMetadataManagerHarness(true);
// Add segments for these partitions, but they are not available as they have not yet been subscribed.
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get();
}
// Register these partitions to RLMM, which loads the respective metadata snapshots.
topicBasedRlmm().onPartitionLeadershipChanges(
Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) {
// Register these partitions to RemoteLogMetadataManager, which loads the respective metadata snapshots.
topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(
Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
// Check for the stored entries from the earlier run.
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(),
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)),
"Remote log segment metadata not available");
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)),
"Remote log segment metadata not available");
// Add one more segment
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
101, 200, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L));
topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
// Check for the stored entries from the earlier run.
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)),
"Remote log segment metadata not available");
TestUtils.waitForCondition(() ->
TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)),
"Remote log segment metadata not available");
// Add one more segment
RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
101, 200, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L));
topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
// Check that both the stored segment and recently added segment are available.
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
// Check that both the stored segment and recently added segment are available.
Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)));
}
}
}