KAFKA-18486 Migrate ReplicaManagerTest#testTransactionAddPartitionRetry and other similar methods to use applyDelta (#19965)
CI / build (push) Waiting to run Details

Change becomeLeaderOrFollower to applyDelta in following test cases:

- testTransactionAddPartitionRetry
- testTransactionVerificationBlocksOutOfOrderSequence
- testTransactionVerificationDynamicDisablement
- testTransactionVerificationFlow
- testTransactionVerificationGuardOnMultiplePartitions
- testTransactionVerificationRejectsLowerProducerEpoch

Reviewers: Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
Jhen-Yung Hsu 2025-06-16 03:48:36 +08:00 committed by GitHub
parent a83bfda39b
commit 86419e9b8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 29 deletions

View File

@ -2121,12 +2121,13 @@ class ReplicaManagerTest {
val producerEpoch = 0.toShort val producerEpoch = 0.toShort
val sequence = 6 val sequence = 6
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val brokerList = Seq[Integer](0, 1).asJava
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try { try {
replicaManager.becomeLeaderOrFollower(1, val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
(_, _) => ()) replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Append some transactional records. // Append some transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
@ -2189,12 +2190,13 @@ class ReplicaManagerTest {
val sequence = 0 val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val scheduler = new MockScheduler(time) val scheduler = new MockScheduler(time)
val brokerList = Seq[Integer](0, 1).asJava
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), scheduler = scheduler) val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0), scheduler = scheduler)
try { try {
replicaManager.becomeLeaderOrFollower(1, val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
(_, _) => ()) replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Append some transactional records. // Append some transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
@ -2255,12 +2257,13 @@ class ReplicaManagerTest {
val producerEpoch = 0.toShort val producerEpoch = 0.toShort
val sequence = 0 val sequence = 0
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val brokerList = Seq[Integer](0, 1).asJava
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try { try {
replicaManager.becomeLeaderOrFollower(1, val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
(_, _) => ()) replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Start with sequence 0 // Start with sequence 0
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
@ -2320,21 +2323,15 @@ class ReplicaManagerTest {
val lowerProducerEpoch= 4.toShort val lowerProducerEpoch= 4.toShort
val sequence = 6 val sequence = 6
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val brokerList = Seq[Integer](0, 1).asJava
val replicaManager = val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try { try {
replicaManager.becomeLeaderOrFollower( val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
1, val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
makeLeaderAndIsrRequest( replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
topicIds(tp0.topic),
tp0,
Seq(0, 1),
new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)
),
(_, _) => ()
)
// first append with epoch 5 // first append with epoch 5
val transactionalRecords = MemoryRecords.withTransactionalRecords( val transactionalRecords = MemoryRecords.withTransactionalRecords(
@ -2390,6 +2387,7 @@ class ReplicaManagerTest {
@Test @Test
def testTransactionVerificationGuardOnMultiplePartitions(): Unit = { def testTransactionVerificationGuardOnMultiplePartitions(): Unit = {
val localId = 0
val mockTimer = new MockTimer(time) val mockTimer = new MockTimer(time)
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1) val tp1 = new TopicPartition(topic, 1)
@ -2400,13 +2398,9 @@ class ReplicaManagerTest {
val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer) val replicaManager = setupReplicaManagerWithMockedPurgatories(mockTimer)
setupMetadataCacheWithTopicIds(topicIds, replicaManager.metadataCache) setupMetadataCacheWithTopicIds(topicIds, replicaManager.metadataCache)
try { try {
replicaManager.becomeLeaderOrFollower(1, val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, partitions = List(0, 1), List.empty, topic, topicIds(topic))
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)), val leaderImage = imageFromTopics(leaderDelta.apply())
(_, _) => ()) replicaManager.applyDelta(leaderDelta, leaderImage)
replicaManager.becomeLeaderOrFollower(1,
makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)),
(_, _) => ())
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord(s"message $sequence".getBytes)) new SimpleRecord(s"message $sequence".getBytes))
@ -2539,12 +2533,13 @@ class ReplicaManagerTest {
val producerEpoch = 0.toShort val producerEpoch = 0.toShort
val sequence = 6 val sequence = 6
val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager]) val addPartitionsToTxnManager = mock(classOf[AddPartitionsToTxnManager])
val brokerList = Seq[Integer](0, 1).asJava
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0)) val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0))
try { try {
replicaManager.becomeLeaderOrFollower(1, val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = brokerList, isr = brokerList)
makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
(_, _) => ()) replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Append some transactional records. // Append some transactional records.
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,