KAFKA-18486 Update testClearPurgatoryOnBecomingFollower etc with KRaft mechanism in ReplicaManagerTest (#19924)
CI / build (push) Waiting to run Details

update the following test to avoid using `becomeLeaderOrFollower`
- testClearPurgatoryOnBecomingFollower
- testDelayedFetchIncludesAbortedTransactions
- testDisabledTransactionVerification
- testFailedBuildRemoteLogAuxStateMetrics

Reviewers: TengYao Chi <kitingiao@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
This commit is contained in:
Ken Huang 2025-06-11 18:52:08 +08:00 committed by GitHub
parent ab42f00bbe
commit 7c715c02c0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 17 additions and 64 deletions

View File

@ -445,26 +445,15 @@ class ReplicaManagerTest {
try { try {
val brokerList = Seq[Integer](0, 1).asJava val brokerList = Seq[Integer](0, 1).asJava
val topicIds = Collections.singletonMap(topic, topicId)
val partition = rm.createPartition(new TopicPartition(topic, 0)) val topicPartition = new TopicPartition(topic, 0)
val partition = rm.createPartition(topicPartition)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false, partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader. // Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val delta = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(0), brokerList, brokerList)
Seq(new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage = imageFromTopics(delta.apply())
.setTopicName(topic) rm.applyDelta(delta, leaderMetadataImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
rm.getPartitionOrException(new TopicPartition(topic, 0)) rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException .localLogOrException
@ -474,20 +463,9 @@ class ReplicaManagerTest {
} }
// Make this replica the follower // Make this replica the follower
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val delta1 = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(1), brokerList, brokerList, 1)
Seq(new LeaderAndIsrRequest.PartitionState() val followerMetadataImage = imageFromTopics(delta1.apply())
.setTopicName(topic) rm.applyDelta(delta1, followerMetadataImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(1)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(false)).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(1, leaderAndIsrRequest2, (_, _) => ())
assertTrue(appendResult.hasFired) assertTrue(appendResult.hasFired)
} finally { } finally {
@ -945,20 +923,9 @@ class ReplicaManagerTest {
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader. // Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val delta = topicsCreateDelta(brokerList.get(0), isStartIdLeader = true, partitions = List(0), List.empty, topic, topicIds(topic))
Seq(new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage = imageFromTopics(delta.apply())
.setTopicName(topic) replicaManager.applyDelta(delta, leaderMetadataImage)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException .localLogOrException
@ -2548,8 +2515,9 @@ class ReplicaManagerTest {
val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp), config = config) val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp), config = config)
try { try {
val becomeLeaderRequest = makeLeaderAndIsrRequest(topicIds(tp.topic), tp, Seq(0, 1), new LeaderAndIsr(0, List(0, 1).map(Int.box).asJava)) val delta = topicsCreateDelta(0, isStartIdLeader = true, partitions = List(0), List.empty, topic, topicIds(topic))
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)
val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence, val transactionalRecords = MemoryRecords.withTransactionalRecords(Compression.NONE, producerId, producerEpoch, sequence,
new SimpleRecord(s"message $sequence".getBytes)) new SimpleRecord(s"message $sequence".getBytes))
@ -4110,23 +4078,6 @@ class ReplicaManagerTest {
try { try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@ -4135,7 +4086,9 @@ class ReplicaManagerTest {
assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), 1, util.List.of(0, 1), util.List.of(0, 1))
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that the metric value is increasing // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
// We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata // We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata