KAFKA-18486 Migrate ReplicaManagerTest#testOffsetOutOfRangeExceptionWhenFetchMessages to use applyDelta (#19952)

Change `becomeLeaderOrFollower` to `applyDelta` in following test cases
* testOffsetOutOfRangeExceptionWhenFetchMessages
* testOffsetOutOfRangeExceptionWhenReadFromLog
* testOldFollowerLosesMetricsWhenReassignPartitions
* testOldLeaderLosesMetricsWhenReassignPartitions

Reviewers: Bolin Lin <linbolin1230@gmail.com>, Lan Ding
 <isDing_L@163.com>, Ken Huang <s7133700@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
This commit is contained in:
PoAn Yang 2025-06-13 15:47:51 +08:00 committed by GitHub
parent 8c0d7412f4
commit 991a10c19f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 69 additions and 154 deletions

View File

@ -3418,11 +3418,8 @@ class ReplicaManagerTest {
@Test @Test
def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = { def testOldLeaderLosesMetricsWhenReassignPartitions(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 0 val leaderEpoch = 0
val leaderEpochIncrement = 1 val leaderEpochIncrement = 1
val correlationId = 0
val controllerId = 0
val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats]) val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1) val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1)
@ -3435,64 +3432,38 @@ class ReplicaManagerTest {
val partition1Replicas = Seq[Integer](1, 0).asJava val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId, 0, brokerEpoch, val delta1 = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
Seq( delta1.replay(new PartitionRecord()
new LeaderAndIsrRequest.PartitionState() .setPartitionId(tp1.partition)
.setTopicName(tp0.topic) .setTopicId(topicIds.get(topic))
.setPartitionIndex(tp0.partition) .setIsr(partition1Replicas)
.setControllerEpoch(controllerEpoch) .setReplicas(partition1Replicas)
.setLeader(0) .setRemovingReplicas(util.List.of())
.setLeaderEpoch(leaderEpoch) .setAddingReplicas(util.List.of())
.setIsr(partition0Replicas) .setLeader(partition1Replicas.get(0))
.setPartitionEpoch(0) .setLeaderEpoch(leaderEpoch)
.setReplicas(partition0Replicas) .setPartitionEpoch(0)
.setIsNew(true), )
new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage1 = imageFromTopics(delta1.apply())
.setTopicName(tp1.topic) rm0.applyDelta(delta1, leaderMetadataImage1)
.setPartitionIndex(tp1.partition) rm1.applyDelta(delta1, leaderMetadataImage1)
.setControllerEpoch(controllerEpoch)
.setLeader(1)
.setLeaderEpoch(leaderEpoch)
.setIsr(partition1Replicas)
.setPartitionEpoch(0)
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
// make broker 0 the leader of partition 1 so broker 1 loses its leadership position // make broker 0 the leader of partition 1 so broker 1 loses its leadership position
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder( controllerId, controllerEpoch, brokerEpoch, val delta2 = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch + leaderEpochIncrement)
Seq( delta2.replay(new PartitionRecord()
new LeaderAndIsrRequest.PartitionState() .setPartitionId(tp1.partition)
.setTopicName(tp0.topic) .setTopicId(topicIds.get(topic))
.setPartitionIndex(tp0.partition) .setIsr(partition1Replicas)
.setControllerEpoch(controllerEpoch) .setReplicas(partition1Replicas)
.setLeader(0) .setRemovingReplicas(util.List.of())
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setAddingReplicas(util.List.of())
.setIsr(partition0Replicas) .setLeader(partition1Replicas.get(1))
.setPartitionEpoch(0) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setReplicas(partition0Replicas) .setPartitionEpoch(0)
.setIsNew(true), )
new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage2 = imageFromTopics(delta2.apply())
.setTopicName(tp1.topic) rm0.applyDelta(delta2, leaderMetadataImage2)
.setPartitionIndex(tp1.partition) rm1.applyDelta(delta2, leaderMetadataImage2)
.setControllerEpoch(controllerEpoch)
.setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition1Replicas)
.setPartitionEpoch(0)
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
} finally { } finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]]( Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => { () => {
@ -3512,11 +3483,8 @@ class ReplicaManagerTest {
@Test @Test
def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = { def testOldFollowerLosesMetricsWhenReassignPartitions(): Unit = {
val controllerEpoch = 0
val leaderEpoch = 0 val leaderEpoch = 0
val leaderEpochIncrement = 1 val leaderEpochIncrement = 1
val correlationId = 0
val controllerId = 0
val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats]) val mockTopicStats1: BrokerTopicStats = mock(classOf[BrokerTopicStats])
val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1) val (rm0, rm1) = prepareDifferentReplicaManagers(mock(classOf[BrokerTopicStats]), mockTopicStats1)
@ -3529,65 +3497,38 @@ class ReplicaManagerTest {
val partition1Replicas = Seq[Integer](1, 0).asJava val partition1Replicas = Seq[Integer](1, 0).asJava
val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava val topicIds = Map(tp0.topic -> Uuid.randomUuid(), tp1.topic -> Uuid.randomUuid()).asJava
val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(controllerId, 0, brokerEpoch, val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas, leaderEpoch)
Seq( delta.replay(new PartitionRecord()
new LeaderAndIsrRequest.PartitionState() .setPartitionId(tp1.partition)
.setTopicName(tp0.topic) .setTopicId(topicIds.get(topic))
.setPartitionIndex(tp0.partition) .setIsr(partition1Replicas)
.setControllerEpoch(controllerEpoch) .setReplicas(partition1Replicas)
.setLeader(1) .setRemovingReplicas(util.List.of())
.setLeaderEpoch(leaderEpoch) .setAddingReplicas(util.List.of())
.setIsr(partition0Replicas) .setLeader(partition1Replicas.get(0))
.setPartitionEpoch(0) .setLeaderEpoch(leaderEpoch)
.setReplicas(partition0Replicas) .setPartitionEpoch(0)
.setIsNew(true), )
new LeaderAndIsrRequest.PartitionState() val leaderMetadataImage = imageFromTopics(delta.apply())
.setTopicName(tp1.topic) rm0.applyDelta(delta, leaderMetadataImage)
.setPartitionIndex(tp1.partition) rm1.applyDelta(delta, leaderMetadataImage)
.setControllerEpoch(controllerEpoch)
.setLeader(1)
.setLeaderEpoch(leaderEpoch)
.setIsr(partition1Replicas)
.setPartitionEpoch(0)
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest1, (_, _) => ())
// make broker 0 the leader of partition 1 so broker 1 loses its leadership position // make broker 0 the leader of partition 1 so broker 1 loses its leadership position
val leaderAndIsrRequest2 = new LeaderAndIsrRequest.Builder(controllerId, val delta2 = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(1), partition0Replicas, partition0Replicas, leaderEpoch + leaderEpochIncrement)
controllerEpoch, brokerEpoch, delta2.replay(new PartitionRecord()
Seq( .setPartitionId(tp1.partition)
new LeaderAndIsrRequest.PartitionState() .setTopicId(topicIds.get(topic))
.setTopicName(tp0.topic) .setIsr(partition1Replicas)
.setPartitionIndex(tp0.partition) .setReplicas(partition1Replicas)
.setControllerEpoch(controllerEpoch) .setRemovingReplicas(util.List.of())
.setLeader(0) .setAddingReplicas(util.List.of())
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement) .setLeader(partition1Replicas.get(1))
.setIsr(partition0Replicas) .setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setPartitionEpoch(0) .setPartitionEpoch(0)
.setReplicas(partition0Replicas) )
.setIsNew(true), val leaderMetadataImage2 = imageFromTopics(delta2.apply())
new LeaderAndIsrRequest.PartitionState() rm0.applyDelta(delta2, leaderMetadataImage2)
.setTopicName(tp1.topic) rm1.applyDelta(delta2, leaderMetadataImage2)
.setPartitionIndex(tp1.partition)
.setControllerEpoch(controllerEpoch)
.setLeader(0)
.setLeaderEpoch(leaderEpoch + leaderEpochIncrement)
.setIsr(partition1Replicas)
.setPartitionEpoch(0)
.setReplicas(partition1Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host0", 0), new Node(1, "host1", 1)).asJava).build()
rm0.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
rm1.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest2, (_, _) => ())
} finally { } finally {
Utils.tryAll(util.Arrays.asList[Callable[Void]]( Utils.tryAll(util.Arrays.asList[Callable[Void]](
() => { () => {
@ -3603,7 +3544,6 @@ class ReplicaManagerTest {
// verify that broker 1 did remove its metrics when no longer being the leader of partition 1 // verify that broker 1 did remove its metrics when no longer being the leader of partition 1
verify(mockTopicStats1).removeOldLeaderMetrics(topic) verify(mockTopicStats1).removeOldLeaderMetrics(topic)
verify(mockTopicStats1).removeOldFollowerMetrics(topic)
} }
private def prepareDifferentReplicaManagers(brokerTopicStats1: BrokerTopicStats, private def prepareDifferentReplicaManagers(brokerTopicStats1: BrokerTopicStats,
@ -3669,22 +3609,9 @@ class ReplicaManagerTest {
val partition0Replicas = Seq[Integer](0, 1).asJava val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0 val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val delta = createLeaderDelta(topicIds.get(topic), tp0, partition0Replicas.get(0), partition0Replicas, partition0Replicas)
Seq( val leaderMetadataImage = imageFromTopics(delta.apply())
new LeaderAndIsrRequest.PartitionState() replicaManager.applyDelta(delta, leaderMetadataImage)
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val params = new FetchParams(replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, Optional.empty) val params = new FetchParams(replicaId, 1, 1000, 0, 100, FetchIsolation.LOG_END, Optional.empty)
// when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately // when reading log, it'll throw OffsetOutOfRangeException, which will be handled separately
@ -3713,6 +3640,7 @@ class ReplicaManagerTest {
@ParameterizedTest @ParameterizedTest
@ValueSource(booleans = Array(true, false)) @ValueSource(booleans = Array(true, false))
def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): Unit = { def testOffsetOutOfRangeExceptionWhenFetchMessages(isFromFollower: Boolean): Unit = {
val brokerList = Seq[Integer](0, 1).asJava
val replicaId = if (isFromFollower) 1 else -1 val replicaId = if (isFromFollower) 1 else -1
val tp0 = new TopicPartition(topic, 0) val tp0 = new TopicPartition(topic, 0)
val tidp0 = new TopicIdPartition(topicId, tp0) val tidp0 = new TopicIdPartition(topicId, tp0)
@ -3721,25 +3649,11 @@ class ReplicaManagerTest {
try { try {
val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava)
replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
val partition0Replicas = Seq[Integer](0, 1).asJava
val topicIds = Map(tp0.topic -> topicId).asJava val topicIds = Map(tp0.topic -> topicId).asJava
val leaderEpoch = 0 val leaderEpoch = 0
val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, val delta = createLeaderDelta(topicIds.get(topic), tp0, brokerList.get(0), brokerList, brokerList)
Seq( val leaderMetadataImage = imageFromTopics(delta.apply())
new LeaderAndIsrRequest.PartitionState() replicaManager.applyDelta(delta, leaderMetadataImage)
.setTopicName(tp0.topic)
.setPartitionIndex(tp0.partition)
.setControllerEpoch(0)
.setLeader(leaderEpoch)
.setLeaderEpoch(0)
.setIsr(partition0Replicas)
.setPartitionEpoch(0)
.setReplicas(partition0Replicas)
.setIsNew(true)
).asJava,
topicIds,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty) val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty)
val fetchOffset = 1 val fetchOffset = 1
@ -4087,7 +4001,8 @@ class ReplicaManagerTest {
assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.buildRemoteLogAuxStateRequestRate.count)
assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count)
val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), 1, util.List.of(0, 1), util.List.of(0, 1)) val brokerList = Seq[Integer](0, 1).asJava
val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), brokerList.get(1), brokerList, brokerList)
val leaderMetadataImage = imageFromTopics(delta.apply()) val leaderMetadataImage = imageFromTopics(delta.apply())
replicaManager.applyDelta(delta, leaderMetadataImage) replicaManager.applyDelta(delta, leaderMetadataImage)