KAFKA-18486: Migrate ReplicaManagerTest to use applyDelta (#19954)
CI / build (push) Waiting to run Details

Change becomeLeaderOrFollower to applyDelta in following test cases

- testReadCommittedFetchLimitedAtLSO
- testReceiveOutOfOrderSequenceExceptionWithLogStartOffset
- testRemoteFetchExpiresPerSecMetric
- testRemoteLogReaderMetrics

Reviewers: PoAn Yang <payang@apache.org>, Bolin Lin
<linbolin1230@gmail.com>, Yung <yungyung7654321@gmail.com>, Jimmy Wang
<48462172+JimmyWang6@users.noreply.github.com>, Ken Huang
<s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
This commit is contained in:
Lan Ding 2025-06-14 11:40:51 +08:00 committed by GitHub
parent 99e1e684ef
commit 081deaa1a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 13 additions and 32 deletions

View File

@ -452,7 +452,7 @@ class ReplicaManagerTest {
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
val delta = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(0), brokerList, brokerList)
val delta = createLeaderDelta(topicId, topicPartition, brokerList.get(0), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
rm.applyDelta(delta, leaderMetadataImage)
rm.getPartitionOrException(new TopicPartition(topic, 0))
@ -464,7 +464,7 @@ class ReplicaManagerTest {
}
// Make this replica the follower
val delta1 = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(1), brokerList, brokerList, 1)
val delta1 = createLeaderDelta(topicId, topicPartition, brokerList.get(1), brokerList, brokerList, 1)
val followerMetadataImage = imageFromTopics(delta1.apply())
rm.applyDelta(delta1, followerMetadataImage)
@ -585,25 +585,16 @@ class ReplicaManagerTest {
try {
val brokerList = Seq[Integer](0, 1).asJava
val partition = replicaManager.createPartition(new TopicPartition(topic, 0))
val tp = new TopicPartition(topic, 0)
val partition = replicaManager.createPartition(tp)
partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
Collections.singletonMap(topic, topicId),
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
val delta = createLeaderDelta(topicId, tp, 0, brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@ -802,20 +793,10 @@ class ReplicaManagerTest {
new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None)
// Make this replica the leader.
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(topic)
.setPartitionIndex(0)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(brokerList)
.setPartitionEpoch(0)
.setReplicas(brokerList)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), 0, brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)
replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
@ -3974,7 +3955,7 @@ class ReplicaManagerTest {
assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
val brokerList = Seq[Integer](0, 1).asJava
val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), brokerList.get(1), brokerList, brokerList)
val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), brokerList.get(1), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage)