KAFKA-18486 Remove becomeLeaderOrFollower from readFromLogWithOffsetOutOfRange and other related methods. (#19929)

refactor out becomeLeaderOrFollower in below tests
- readFromLogWithOffsetOutOfRange
- testBecomeFollowerWhileNewClientFetchInPurgatory
- testBecomeFollowerWhileOldClientFetchInPurgatory
- testBuildRemoteLogAuxStateMetricsThrowsException

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Ken Huang
 <s7133700@gmail.com>, TengYao Chi <frankvicky@apache.org>
This commit is contained in:
Kuan-Po Tseng 2025-06-10 12:39:32 +08:00 committed by GitHub
parent dbfda79951
commit 3a0a1705a1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 38 additions and 98 deletions

View File

@ -2017,22 +2017,10 @@ class ReplicaManagerTest {
val tidp0 = new TopicIdPartition(topicId, tp0)
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0)
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100,
Optional.empty())
@ -2040,20 +2028,9 @@ class ReplicaManagerTest {
assertFalse(fetchResult.hasFired)
// Become a follower and ensure that the delayed fetch returns immediately
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(2)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, leaderId = 1, leaderEpoch = 2)
val followerMetadataImage = imageFromTopics(followerDelta.apply())
replicaManager.applyDelta(followerDelta, followerMetadataImage)
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error)
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -2072,20 +2049,9 @@ class ReplicaManagerTest {
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(1)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = 1, replicas = partition0Replicas, isr = partition0Replicas)
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "")
val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100,
@ -2100,20 +2066,9 @@ class ReplicaManagerTest {
assertFalse(fetchResult.hasFired)
// Become a follower and ensure that the delayed fetch returns immediately
val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(2)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)).asJava,
topicIds.asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, leaderId = 1, leaderEpoch = 2)
val followerMetadataImage = imageFromTopics(followerDelta.apply())
replicaManager.applyDelta(followerDelta, followerMetadataImage)
assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error)
} finally {
replicaManager.shutdown(checkpointHW = false)
@ -4215,22 +4170,6 @@ class ReplicaManagerTest {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(1)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
// Verify the metrics for build remote log state and for failures is zero before replicas start to fetch
assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
@ -4239,7 +4178,9 @@ class ReplicaManagerTest {
assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = partition0Replicas, isr = partition0Replicas)
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
// We expect failedBuildRemoteLogAuxStateRate to increase because fetchRemoteLogSegmentMetadata returns RemoteStorageException
@ -4558,9 +4499,17 @@ class ReplicaManagerTest {
}
}
private def createLeaderDelta(topicId: Uuid, partition: TopicPartition, leaderId: Int): TopicsDelta = {
private def createLeaderDelta(
topicId: Uuid,
partition: TopicPartition,
leaderId: Integer,
replicas: util.List[Integer] = null,
isr: util.List[Integer] = null,
leaderEpoch: Int = 0): TopicsDelta = {
val delta = new TopicsDelta(TopicsImage.EMPTY)
val effectiveReplicas = Option(replicas).getOrElse(java.util.List.of(leaderId))
val effectiveIsr = Option(isr).getOrElse(java.util.List.of(leaderId))
delta.replay(new TopicRecord()
.setName(partition.topic)
.setTopicId(topicId)
@ -4569,19 +4518,24 @@ class ReplicaManagerTest {
delta.replay(new PartitionRecord()
.setPartitionId(partition.partition)
.setTopicId(topicId)
.setReplicas(util.Arrays.asList(leaderId))
.setIsr(util.Arrays.asList(leaderId))
.setReplicas(effectiveReplicas)
.setIsr(effectiveIsr)
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
.setLeaderEpoch(0)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
)
delta
}
private def createFollowerDelta(topicId: Uuid, partition: TopicPartition, followerId: Int, leaderId: Int): TopicsDelta = {
private def createFollowerDelta(
topicId: Uuid,
partition: TopicPartition,
followerId: Int,
leaderId: Int,
leaderEpoch: Int = 0): TopicsDelta = {
val delta = new TopicsDelta(TopicsImage.EMPTY)
delta.replay(new TopicRecord()
@ -4597,7 +4551,7 @@ class ReplicaManagerTest {
.setRemovingReplicas(Collections.emptyList())
.setAddingReplicas(Collections.emptyList())
.setLeader(leaderId)
.setLeaderEpoch(0)
.setLeaderEpoch(leaderEpoch)
.setPartitionEpoch(0)
)
@ -6352,24 +6306,10 @@ class ReplicaManagerTest {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp.topic -> topicId).asJava
val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch,
Seq(
new LeaderAndIsrRequest.PartitionState()
.setTopicName(tp.topic)
.setPartitionIndex(tp.partition)
.setControllerEpoch(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr = partition0Replicas)
val leaderMetadataImage = imageFromTopics(leaderDelta.apply())
replicaManager.applyDelta(leaderDelta, leaderMetadataImage)
val params = new FetchParams(-1, 1, 1000, 0, 100, FetchIsolation.HIGH_WATERMARK, Optional.empty)
replicaManager.readFromLog(