diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 9077f664eff..3611af26488 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -2519,6 +2519,7 @@ class ReplicaManagerTest { @Test def testExceptionWhenUnverifiedTransactionHasMultipleProducerIds(): Unit = { + val localId = 1 val tp0 = new TopicPartition(topic, 0) val tp1 = new TopicPartition(topic, 1) val transactionalId = "txn1" @@ -2531,13 +2532,9 @@ class ReplicaManagerTest { val replicaManager = setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager, List(tp0, tp1)) try { - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) - - replicaManager.becomeLeaderOrFollower(1, - makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)), - (_, _) => ()) + val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, partitions = List(0, 1), List.empty, topic, topicIds(topic)) + val leaderImage = imageFromTopics(leaderDelta.apply()) + replicaManager.applyDelta(leaderDelta, leaderImage) // Append some transactional records with different producer IDs val transactionalRecords = mutable.Map[TopicPartition, MemoryRecords]() @@ -4651,7 +4648,7 @@ class ReplicaManagerTest { try { val directoryIds = replicaManager.logManager.directoryIdsSet.toList assertEquals(directoryIds.size, 2) - val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds) + val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds) val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), leaderTopicsDelta, FOO_UUID).get partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(), 1, localId, Seq(1, 2)), new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), @@ -4698,7 +4695,7 @@ class ReplicaManagerTest { // Test applying delta as leader val directoryIds = replicaManager.logManager.directoryIdsSet.toList // Make the local replica the leader - val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = directoryIds) + val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = directoryIds) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) @@ -4708,7 +4705,7 @@ class ReplicaManagerTest { assertEquals(directoryIds.head, logDirIdHostingPartition0) // Test applying delta as follower - val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = directoryIds) + val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = directoryIds) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) @@ -4736,7 +4733,7 @@ class ReplicaManagerTest { try { // Make the local replica the leader - val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) + val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = List(0), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply()) val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0) @@ -4744,7 +4741,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) // Make the local replica the as follower - val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) + val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) @@ -4790,7 +4787,7 @@ class ReplicaManagerTest { replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage) // Make the local replica the as follower - val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST)) + val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = List(1), directoryIds = List(DirectoryId.LOST, DirectoryId.LOST)) val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage) @@ -5787,23 +5784,26 @@ class ReplicaManagerTest { } } - private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = { + private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = { val leader = if (isStartIdLeader) startId else startId + 1 val delta = new TopicsDelta(TopicsImage.EMPTY) - delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID)) - val record = partitionRecord(startId, leader, partition) - if (directoryIds.nonEmpty) { - record.setDirectories(directoryIds.asJava) + delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId)) + + partitions.foreach { partition => + val record = partitionRecord(startId, leader, partition, topicId) + if (directoryIds.nonEmpty) { + record.setDirectories(directoryIds.asJava) + } + delta.replay(record) } - delta.replay(record) delta } - private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) = { + private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, topicId: Uuid = FOO_UUID) = { new PartitionRecord() .setPartitionId(partition) - .setTopicId(FOO_UUID) + .setTopicId(topicId) .setReplicas(util.Arrays.asList(startId, startId + 1)) .setIsr(util.Arrays.asList(startId, startId + 1)) .setRemovingReplicas(Collections.emptyList())