diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java index a063fa8820a..7af78e750a8 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java @@ -53,12 +53,6 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, RemoteLogMetadataTopicPartitioner::new, remotePartitionMetadataStoreSupplier); } - public void initializeRemoteLogMetadataManager(Set topicIdPartitions, - boolean startConsumerThread, - Function remoteLogMetadataTopicPartitioner) { - initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread, remoteLogMetadataTopicPartitioner, RemotePartitionMetadataStore::new); - } - public void initializeRemoteLogMetadataManager(Set topicIdPartitions, boolean startConsumerThread, Function remoteLogMetadataTopicPartitioner, @@ -70,6 +64,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa .startConsumerThread(startConsumerThread) .remoteLogMetadataTopicPartitioner(remoteLogMetadataTopicPartitioner) .remotePartitionMetadataStore(remotePartitionMetadataStoreSupplier) + .overrideRemoteLogMetadataManagerProps(overrideRemoteLogMetadataManagerProps()) .build(); } diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java index c599259ed94..07b08145731 100644 --- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java +++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java @@ -16,6 +16,11 @@ */ package org.apache.kafka.server.log.remote.metadata.storage; +import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterTest; +import kafka.test.junit.ClusterTestExtensions; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.common.TopicIdPartition; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; @@ -24,139 +29,97 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; import org.apache.kafka.test.TestUtils; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import scala.collection.JavaConverters; -import scala.collection.Seq; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.extension.ExtendWith; -import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR; -@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters +@ExtendWith(value = ClusterTestExtensions.class) +@Tag("integration") public class TopicBasedRemoteLogMetadataManagerRestartTest { private static final int SEG_SIZE = 1024 * 1024; private final Time time = new MockTime(1); private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath(); + private final ClusterInstance clusterInstance; - private TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness; - - @BeforeEach - public void setup() { - // Start the cluster and initialize TopicBasedRemoteLogMetadataManager. - remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness() { - protected Map overrideRemoteLogMetadataManagerProps() { - Map props = new HashMap<>(); - props.put(LOG_DIR, logDir); - return props; - } - }; - remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true); + TopicBasedRemoteLogMetadataManagerRestartTest(ClusterInstance clusterInstance) { // Constructor injections + this.clusterInstance = clusterInstance; } - private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) { - remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread, RemoteLogMetadataTopicPartitioner::new); + private TopicBasedRemoteLogMetadataManager createTopicBasedRemoteLogMetadataManager() { + return RemoteLogMetadataManagerTestUtils.builder() + .topicIdPartitions(Collections.emptySet()) + .bootstrapServers(clusterInstance.bootstrapServers()) + .startConsumerThread(true) + .remoteLogMetadataTopicPartitioner(RemoteLogMetadataTopicPartitioner::new) + .overrideRemoteLogMetadataManagerProps(Collections.singletonMap(LOG_DIR, logDir)) + .build(); } - @AfterEach - public void teardown() throws IOException { - if (remoteLogMetadataManagerHarness != null) { - remoteLogMetadataManagerHarness.close(); - } - } - - private void stopTopicBasedRemoteLogMetadataManagerHarness() { - remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager(); - } - - private TopicBasedRemoteLogMetadataManager topicBasedRlmm() { - return remoteLogMetadataManagerHarness.remoteLogMetadataManager(); - } - - @Test + @ClusterTest(brokers = 3) public void testRLMMAPIsAfterRestart() throws Exception { // Create topics. String leaderTopic = "new-leader"; - HashMap> assignedLeaderTopicReplicas = new HashMap<>(); - List leaderTopicReplicas = new ArrayList<>(); - // Set broker id 0 as the first entry which is taken as the leader. - leaderTopicReplicas.add(0); - leaderTopicReplicas.add(1); - leaderTopicReplicas.add(2); - assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas)); - remoteLogMetadataManagerHarness.createTopicWithAssignment( - leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas), - remoteLogMetadataManagerHarness.listenerName()); - String followerTopic = "new-follower"; - HashMap> assignedFollowerTopicReplicas = new HashMap<>(); - List followerTopicReplicas = new ArrayList<>(); - // Set broker id 1 as the first entry which is taken as the leader. - followerTopicReplicas.add(1); - followerTopicReplicas.add(2); - followerTopicReplicas.add(0); - assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas)); - remoteLogMetadataManagerHarness.createTopicWithAssignment(followerTopic, - JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas), - remoteLogMetadataManagerHarness.listenerName()); + try (Admin admin = clusterInstance.createAdminClient()) { + // Set broker id 0 as the first entry which is taken as the leader. + NewTopic newLeaderTopic = new NewTopic(leaderTopic, Collections.singletonMap(0, Arrays.asList(0, 1, 2))); + // Set broker id 1 as the first entry which is taken as the leader. + NewTopic newFollowerTopic = new NewTopic(followerTopic, Collections.singletonMap(0, Arrays.asList(1, 2, 0))); + admin.createTopics(Arrays.asList(newLeaderTopic, newFollowerTopic)); + } final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0)); final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0)); - - // Register these partitions to RLMM. - topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); - - // Add segments for these partitions, but they are not available as they have not yet been subscribed. RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get(); - RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata( new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()), 0, 100, -1L, 0, time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L)); - topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get(); - // Stop TopicBasedRemoteLogMetadataManager only. - stopTopicBasedRemoteLogMetadataManagerHarness(); + try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { + // Register these partitions to RemoteLogMetadataManager. + topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges( + Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); - // Start TopicBasedRemoteLogMetadataManager - startTopicBasedRemoteLogMetadataManagerHarness(true); + // Add segments for these partitions, but they are not available as they have not yet been subscribed. + topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata).get(); + topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(followerSegmentMetadata).get(); + } - // Register these partitions to RLMM, which loads the respective metadata snapshots. - topicBasedRlmm().onPartitionLeadershipChanges( - Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); + try (TopicBasedRemoteLogMetadataManager topicBasedRemoteLogMetadataManager = createTopicBasedRemoteLogMetadataManager()) { + // Register these partitions to RemoteLogMetadataManager, which loads the respective metadata snapshots. + topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges( + Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition)); - // Check for the stored entries from the earlier run. - TestUtils.waitForCondition(() -> - TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(), - topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)), - "Remote log segment metadata not available"); - TestUtils.waitForCondition(() -> - TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(), - topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)), - "Remote log segment metadata not available"); - // Add one more segment - RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( - new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), - 101, 200, -1L, 0, - time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L)); - topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); + // Check for the stored entries from the earlier run. + TestUtils.waitForCondition(() -> + TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(), + topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition)), + "Remote log segment metadata not available"); + TestUtils.waitForCondition(() -> + TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(), + topicBasedRemoteLogMetadataManager.listRemoteLogSegments(followerTopicIdPartition)), + "Remote log segment metadata not available"); + // Add one more segment + RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata( + new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()), + 101, 200, -1L, 0, + time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L)); + topicBasedRemoteLogMetadataManager.addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get(); - // Check that both the stored segment and recently added segment are available. - Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), - topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition))); + // Check that both the stored segment and recently added segment are available. + Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(), + topicBasedRemoteLogMetadataManager.listRemoteLogSegments(leaderTopicIdPartition))); + } } } \ No newline at end of file