diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala index e05ea8b18a0..384d067e2b6 100755 --- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala @@ -17,14 +17,13 @@ package kafka.admin -import java.util.{Collections, Optional} +import java.util.Collections import kafka.controller.ReplicaAssignment import kafka.server.{BaseRequestTest, BrokerServer} import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic} import org.apache.kafka.common.errors.InvalidReplicaAssignmentException -import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse} import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{BeforeEach, TestInfo} @@ -178,7 +177,7 @@ class AddPartitionsTest extends BaseRequestTest { } @ParameterizedTest - @ValueSource(strings = Array("zk")) // TODO: add kraft support + @ValueSource(strings = Array("kraft")) def testReplicaPlacementAllServers(quorum: String): Unit = { admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get() @@ -194,17 +193,19 @@ class AddPartitionsTest extends BaseRequestTest { new MetadataRequest.Builder(Seq(topic3).asJava, false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head - validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2)) - validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3)) - validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1)) - validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2)) - validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3)) + + assertEquals(7, topicMetadata.partitionMetadata.size) + for (partition <- topicMetadata.partitionMetadata.asScala) { + val replicas = partition.replicaIds.asScala.toSet + assertEquals(4, replicas.size, s"Partition ${partition.partition} should have 4 replicas") + assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3") + assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader") + assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas") + } } @ParameterizedTest - @ValueSource(strings = Array("zk")) // TODO: add kraft support + @ValueSource(strings = Array("kraft")) def testReplicaPlacementPartialServers(quorum: String): Unit = { admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get() @@ -216,19 +217,15 @@ class AddPartitionsTest extends BaseRequestTest { new MetadataRequest.Builder(Seq(topic2).asJava, false).build) assertEquals(1, response.topicMetadata.size) val topicMetadata = response.topicMetadata.asScala.head - validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2)) - validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2)) - validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3)) - } - def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int, - expectedReplicas: Set[Int]): Unit = { - val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == partitionId) - assertTrue(partitionOpt.isDefined, s"Partition $partitionId should exist") - val partition = partitionOpt.get - - assertEquals(Optional.of(expectedLeaderId), partition.leaderId, "Partition leader id should match") - assertEquals(expectedReplicas, partition.replicaIds.asScala.toSet, "Replica set should match") + assertEquals(3, topicMetadata.partitionMetadata.size) + for (partition <- topicMetadata.partitionMetadata.asScala) { + val replicas = partition.replicaIds.asScala.toSet + assertEquals(2, replicas.size, s"Partition ${partition.partition} should have 2 replicas") + assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3") + assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader") + assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas") + } } }