From 3a0a1705a1a7caa9b4b14158b05325138c904451 Mon Sep 17 00:00:00 2001 From: Kuan-Po Tseng Date: Tue, 10 Jun 2025 12:39:32 +0800 Subject: [PATCH] KAFKA-18486 Remove becomeLeaderOrFollower from readFromLogWithOffsetOutOfRange and other related methods. (#19929) refactor out becomeLeaderOrFollower in below tests - readFromLogWithOffsetOutOfRange - testBecomeFollowerWhileNewClientFetchInPurgatory - testBecomeFollowerWhileOldClientFetchInPurgatory - testBuildRemoteLogAuxStateMetricsThrowsException Reviewers: Chia-Ping Tsai , Ken Huang , TengYao Chi --- .../kafka/server/ReplicaManagerTest.scala | 136 +++++------------- 1 file changed, 38 insertions(+), 98 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index de1441e5be9..f90b7fa9e55 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2017,22 +2017,10 @@ class ReplicaManagerTest { val tidp0 = new TopicIdPartition(topicId, tp0) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, Optional.empty()) @@ -2040,20 +2028,9 @@ class ReplicaManagerTest { assertFalse(fetchResult.hasFired) // Become a follower and ensure that the delayed fetch returns immediately - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(2) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, leaderId = 1, leaderEpoch = 2) + val followerMetadataImage = imageFromTopics(followerDelta.apply()) + replicaManager.applyDelta(followerDelta, followerMetadataImage) assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.assertFired.error) } finally { replicaManager.shutdown(checkpointHW = false) @@ -2072,20 +2049,9 @@ class ReplicaManagerTest { replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val partition0Replicas = Seq[Integer](0, 1).asJava - val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(1) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = 1, replicas = partition0Replicas, isr = partition0Replicas) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) val clientMetadata = new DefaultClientMetadata("", "", null, KafkaPrincipal.ANONYMOUS, "") val partitionData = new FetchRequest.PartitionData(Uuid.ZERO_UUID, 0L, 0L, 100, @@ -2100,20 +2066,9 @@ class ReplicaManagerTest { assertFalse(fetchResult.hasFired) // Become a follower and ensure that the delayed fetch returns immediately - val becomeFollowerRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(2) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ()) + val followerDelta = createFollowerDelta(topicId, tp0, followerId = 0, leaderId = 1, leaderEpoch = 2) + val followerMetadataImage = imageFromTopics(followerDelta.apply()) + replicaManager.applyDelta(followerDelta, followerMetadataImage) assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResult.assertFired.error) } finally { replicaManager.shutdown(checkpointHW = false) @@ -4215,22 +4170,6 @@ class ReplicaManagerTest { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp0.topic -> topicId).asJava - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq( - new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(1) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true) - ).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() // Verify the metrics for build remote log state and for failures is zero before replicas start to fetch assertEquals(0, brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count) @@ -4239,7 +4178,9 @@ class ReplicaManagerTest { assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 1, replicas = partition0Replicas, isr = partition0Replicas) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) // Replicas fetch from the leader periodically, therefore we check that the metric value is increasing // We expect failedBuildRemoteLogAuxStateRate to increase because fetchRemoteLogSegmentMetadata returns RemoteStorageException @@ -4558,9 +4499,17 @@ class ReplicaManagerTest { } } - private def createLeaderDelta(topicId: Uuid, partition: TopicPartition, leaderId: Int): TopicsDelta = { + private def createLeaderDelta( + topicId: Uuid, + partition: TopicPartition, + leaderId: Integer, + replicas: util.List[Integer] = null, + isr: util.List[Integer] = null, + leaderEpoch: Int = 0): TopicsDelta = { val delta = new TopicsDelta(TopicsImage.EMPTY) - + val effectiveReplicas = Option(replicas).getOrElse(java.util.List.of(leaderId)) + val effectiveIsr = Option(isr).getOrElse(java.util.List.of(leaderId)) + delta.replay(new TopicRecord() .setName(partition.topic) .setTopicId(topicId) @@ -4569,19 +4518,24 @@ class ReplicaManagerTest { delta.replay(new PartitionRecord() .setPartitionId(partition.partition) .setTopicId(topicId) - .setReplicas(util.Arrays.asList(leaderId)) - .setIsr(util.Arrays.asList(leaderId)) + .setReplicas(effectiveReplicas) + .setIsr(effectiveIsr) .setRemovingReplicas(Collections.emptyList()) .setAddingReplicas(Collections.emptyList()) .setLeader(leaderId) - .setLeaderEpoch(0) + .setLeaderEpoch(leaderEpoch) .setPartitionEpoch(0) ) delta } - private def createFollowerDelta(topicId: Uuid, partition: TopicPartition, followerId: Int, leaderId: Int): TopicsDelta = { + private def createFollowerDelta( + topicId: Uuid, + partition: TopicPartition, + followerId: Int, + leaderId: Int, + leaderEpoch: Int = 0): TopicsDelta = { val delta = new TopicsDelta(TopicsImage.EMPTY) delta.replay(new TopicRecord() @@ -4597,7 +4551,7 @@ class ReplicaManagerTest { .setRemovingReplicas(Collections.emptyList()) .setAddingReplicas(Collections.emptyList()) .setLeader(leaderId) - .setLeaderEpoch(0) + .setLeaderEpoch(leaderEpoch) .setPartitionEpoch(0) ) @@ -6352,24 +6306,10 @@ class ReplicaManagerTest { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints = offsetCheckpoints, None) val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp.topic -> topicId).asJava val leaderEpoch = 0 - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq( - new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp.topic) - .setPartitionIndex(tp.partition) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true) - ).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp, leaderId = 0, leaderEpoch = leaderEpoch, replicas = partition0Replicas, isr = partition0Replicas) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) val params = new FetchParams(-1, 1, 1000, 0, 100, FetchIsolation.HIGH_WATERMARK, Optional.empty) replicaManager.readFromLog(