diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1cd340aa329..956659c565a 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -3727,25 +3727,11 @@ class ReplicaManagerTest { try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq( - new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(leaderEpoch) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true) - ).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) val params = new FetchParams(replicaId, 1, 1000, 10, 100, FetchIsolation.LOG_END, Optional.empty) val fetchOffset = 1 @@ -3840,25 +3826,11 @@ class ReplicaManagerTest { try { val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava) replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None) - val partition0Replicas = Seq[Integer](0, 1).asJava - val topicIds = Map(tp0.topic -> topicId).asJava + val leaderEpoch = 0 - val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq( - new LeaderAndIsrRequest.PartitionState() - .setTopicName(tp0.topic) - .setPartitionIndex(tp0.partition) - .setControllerEpoch(0) - .setLeader(leaderEpoch) - .setLeaderEpoch(0) - .setIsr(partition0Replicas) - .setPartitionEpoch(0) - .setReplicas(partition0Replicas) - .setIsNew(true) - ).asJava, - topicIds, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ()) + val leaderDelta = createLeaderDelta(topicId, tp0, leaderId = 0, leaderEpoch = leaderEpoch) + val leaderMetadataImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderMetadataImage) val mockLog = replicaManager.getPartitionOrException(tp0).log.get when(mockLog.endOffsetForEpoch(anyInt())).thenReturn(Optional.of(new OffsetAndEpoch(1, 1)))