From 7c715c02c06f16475faff8aa72048cddb7382c8a Mon Sep 17 00:00:00 2001 From: Ken Huang Date: Wed, 11 Jun 2025 18:52:08 +0800 Subject: [PATCH] KAFKA-18486 Update testClearPurgatoryOnBecomingFollower etc with KRaft mechanism in ReplicaManagerTest (#19924) update the following test to avoid using `becomeLeaderOrFollower` - testClearPurgatoryOnBecomingFollower - testDelayedFetchIncludesAbortedTransactions - testDisabledTransactionVerification - testFailedBuildRemoteLogAuxStateMetrics Reviewers: TengYao Chi , Chia-Ping Tsai --- .../kafka/server/ReplicaManagerTest.scala | 81 ++++--------------- 1 file changed, 17 insertions(+), 64 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 04a1677b474..ee2115c635f 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -445,26 +445,15 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val topicIds = Collections.singletonMap(topic, topicId) - val partition = rm.createPartition(new TopicPartition(topic, 0)) + val topicPartition = new TopicPartition(topic, 0) + val partition = rm.createPartition(topicPartition) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) // Make this replica the leader. - val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(false)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + val delta = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(0), brokerList, brokerList) + val leaderMetadataImage = imageFromTopics(delta.apply()) + rm.applyDelta(delta, leaderMetadataImage) rm.getPartitionOrException(new TopicPartition(topic, 0)) .localLogOrException @@ -474,20 +463,9 @@ class ReplicaManagerTest { } // Make this replica the follower - val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(1) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(false)).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ()) + val delta1 = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(1), brokerList, brokerList, 1) + val followerMetadataImage = imageFromTopics(delta1.apply()) + rm.applyDelta(delta1, followerMetadataImage) assertTrue(appendResult.hasFired) } finally { @@ -945,20 +923,9 @@ class ReplicaManagerTest { new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) // Make this replica the leader. - val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + val delta = topicsCreateDelta(brokerList.get(0), isStartIdLeader = true, partitions = List(0), List.empty, topic, topicIds(topic)) + val leaderMetadataImage = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, leaderMetadataImage) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) .localLogOrException @@ -2548,8 +2515,9 @@ class ReplicaManagerTest { val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp), config = config) try { - val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)) - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), List.empty, topic, topicIds(topic)) + val leaderMetadataImage = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, leaderMetadataImage) val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) @@ -4110,23 +4078,6 @@ class ReplicaManagerTest { try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp0.topic -> topicId).asJava - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq( - new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true) - ).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) @@ -4135,7 +4086,9 @@ class ReplicaManagerTest { assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), 1, util.List.of(0, 1), util.List.of(0, 1)) + val leaderMetadataImage = imageFromTopics(delta.apply()) + replicaManager.applyDelta(delta, leaderMetadataImage) // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing // We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata