diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 956659c565a..677acf87376 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -452,7 +452,7 @@ class ReplicaManagerTest { partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(rm.highWatermarkCheckpoints.asJava), None) // Make this replica the leader. - val delta = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(0), brokerList, brokerList) + val delta = createLeaderDelta(topicId, topicPartition, brokerList.get(0), brokerList, brokerList) val leaderMetadataImage = imageFromTopics(delta.apply()) rm.applyDelta(delta, leaderMetadataImage) rm.getPartitionOrException(new TopicPartition(topic, 0)) @@ -464,7 +464,7 @@ class ReplicaManagerTest { } // Make this replica the follower - val delta1 = createLeaderDelta(topicIds(topic), topicPartition, brokerList.get(1), brokerList, brokerList, 1) + val delta1 = createLeaderDelta(topicId, topicPartition, brokerList.get(1), brokerList, brokerList, 1) val followerMetadataImage = imageFromTopics(delta1.apply()) rm.applyDelta(delta1, followerMetadataImage) @@ -585,25 +585,16 @@ class ReplicaManagerTest { try { val brokerList = Seq[Integer](0, 1).asJava - val partition = replicaManager.createPartition(new TopicPartition(topic, 0)) + val tp = new TopicPartition(topic, 0) + val partition = replicaManager.createPartition(tp) partition.createLogIfNotExists(isNew = false, isFutureReplica = false, new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) // Make this replica the leader. - val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - Collections.singletonMap(topic, topicId), - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + val delta = createLeaderDelta(topicId, tp, 0, brokerList, brokerList) + val leaderMetadataImage = imageFromTopics(delta.apply()) + + replicaManager.applyDelta(delta, leaderMetadataImage) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) .localLogOrException @@ -802,20 +793,10 @@ class ReplicaManagerTest { new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), None) // Make this replica the leader. - val leaderAndIsrRequest1 = new LeaderAndIsrRequest.Builder(0, 0, brokerEpoch, - Seq(new LeaderAndIsrRequest.PartitionState() - .setTopicName(topic) - .setPartitionIndex(0) - .setControllerEpoch(0) - .setLeader(0) - .setLeaderEpoch(0) - .setIsr(brokerList) - .setPartitionEpoch(0) - .setReplicas(brokerList) - .setIsNew(true)).asJava, - topicIds.asJava, - Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build() - replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ()) + val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), 0, brokerList, brokerList) + val leaderMetadataImage = imageFromTopics(delta.apply()) + + replicaManager.applyDelta(delta, leaderMetadataImage) replicaManager.getPartitionOrException(new TopicPartition(topic, 0)) .localLogOrException @@ -3974,7 +3955,7 @@ class ReplicaManagerTest { assertEquals(0, brokerTopicStats.allTopicsStats.failedBuildRemoteLogAuxStateRate.count) val brokerList = Seq[Integer](0, 1).asJava - val delta = createLeaderDelta(topicIds(topic), new TopicPartition(topic, 0), brokerList.get(1), brokerList, brokerList) + val delta = createLeaderDelta(topicId, new TopicPartition(topic, 0), brokerList.get(1), brokerList, brokerList) val leaderMetadataImage = imageFromTopics(delta.apply()) replicaManager.applyDelta(delta, leaderMetadataImage)