From 86419e9b8aedbda0434515690a649065948ff918 Mon Sep 17 00:00:00 2001 From: Jhen-Yung Hsu Date: Mon, 16 Jun 2025 03:48:36 +0800 Subject: [PATCH] KAFKA-18486 Migrate ReplicaManagerTest#testTransactionAddPartitionRetry and other similar methods to use applyDelta (#19965) Change becomeLeaderOrFollower to applyDelta in following test cases: - testTransactionAddPartitionRetry - testTransactionVerificationBlocksOutOfOrderSequence - testTransactionVerificationDynamicDisablement - testTransactionVerificationFlow - testTransactionVerificationGuardOnMultiplePartitions - testTransactionVerificationRejectsLowerProducerEpoch Reviewers: Ken Huang , Chia-Ping Tsai --- .../kafka/server/ReplicaManagerTest.scala | 53 +++++++++---------- 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 677acf87376..d5d500c87c7 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2121,12 +2121,13 @@ class ReplicaManagerTest { val producerEpoch = 0.toShort val sequence = 6 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + val brokerList = Seq[Integer](0, 1).asJava val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // Append some transactional records. val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, @@ -2189,12 +2190,13 @@ class ReplicaManagerTest { val sequence = 0 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val scheduler = new MockScheduler(time) + val brokerList = Seq[Integer](0, 1).asJava val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), scheduler = scheduler) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // Append some transactional records. val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, @@ -2255,12 +2257,13 @@ class ReplicaManagerTest { val producerEpoch = 0.toShort val sequence = 0 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + val brokerList = Seq[Integer](0, 1).asJava val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // Start with sequence 0 val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, @@ -2320,21 +2323,15 @@ class ReplicaManagerTest { val lowerProducerEpoch= 4.toShort val sequence = 6 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + val brokerList = Seq[Integer](0, 1).asJava val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) try { - replicaManager.becomeLeaderOrFollower( - 1, - makeLeaderAndIsrRequest( - topicIds(tp0.topic), - tp0, - Seq(0, 1), - new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava) - ), - (_, _) => () - ) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // first append with epoch 5 val transactionalRecords = MemoryRecords.withTransactionalRecords( @@ -2390,6 +2387,7 @@ class ReplicaManagerTest { @Test def testTransactionVerificationGuardOnMultiplePartitions(): Unit = { + val localId = 0 val mockTimer = new MockTimer(time) val tp0 = new TopicPartition(topic, 0) val tp1 = new TopicPartition(topic, 1) @@ -2400,13 +2398,9 @@ class ReplicaManagerTest { val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer) setupMetadataCacheWithTopicIds(topicIds, replicaManager.metadataCache) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) - - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, partitions = List(0, 1), List.empty, topic, topicIds(topic)) + val leaderImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderImage) val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, new SimpleRecord(s"message $sequence".getBytes)) @@ -2539,12 +2533,13 @@ class ReplicaManagerTest { val producerEpoch = 0.toShort val sequence = 6 val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) + val brokerList = Seq[Integer](0, 1).asJava val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // Append some transactional records. val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,