KAFKA-18098 add kraft support to testReplicaPlacementAllServers and testReplicaPlacementPartialServers (#17955)

Reviewers: Yung <yungyung7654321@gmail.com>, Colin P. McCabe <cmccabe@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
Peter Lee 2024-11-28 05:21:47 +08:00 committed by GitHub
parent d334f60944
commit b08b64c2d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 20 additions and 23 deletions

View File

@ -17,14 +17,13 @@
package kafka.admin
import java.util.{Collections, Optional}
import java.util.Collections
import kafka.controller.ReplicaAssignment
import kafka.server.{BaseRequestTest, BrokerServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import org.apache.kafka.clients.admin.{Admin, NewPartitions, NewTopic}
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
import org.apache.kafka.common.requests.{MetadataRequest, MetadataResponse}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{BeforeEach, TestInfo}
@ -178,7 +177,7 @@ class AddPartitionsTest extends BaseRequestTest {
}
@ParameterizedTest
@ValueSource(strings = Array("zk")) // TODO: add kraft support
@ValueSource(strings = Array("kraft"))
def testReplicaPlacementAllServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic3, NewPartitions.increaseTo(7))).all().get()
@ -194,17 +193,19 @@ class AddPartitionsTest extends BaseRequestTest {
new MetadataRequest.Builder(Seq(topic3).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head
validateLeaderAndReplicas(topicMetadata, 0, 2, Set(2, 3, 0, 1))
validateLeaderAndReplicas(topicMetadata, 1, 3, Set(3, 2, 0, 1))
validateLeaderAndReplicas(topicMetadata, 2, 0, Set(0, 3, 1, 2))
validateLeaderAndReplicas(topicMetadata, 3, 1, Set(1, 0, 2, 3))
validateLeaderAndReplicas(topicMetadata, 4, 2, Set(2, 3, 0, 1))
validateLeaderAndReplicas(topicMetadata, 5, 3, Set(3, 0, 1, 2))
validateLeaderAndReplicas(topicMetadata, 6, 0, Set(0, 1, 2, 3))
assertEquals(7, topicMetadata.partitionMetadata.size)
for (partition <- topicMetadata.partitionMetadata.asScala) {
val replicas = partition.replicaIds.asScala.toSet
assertEquals(4, replicas.size, s"Partition ${partition.partition} should have 4 replicas")
assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3")
assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader")
assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas")
}
}
@ParameterizedTest
@ValueSource(strings = Array("zk")) // TODO: add kraft support
@ValueSource(strings = Array("kraft"))
def testReplicaPlacementPartialServers(quorum: String): Unit = {
admin.createPartitions(Collections.singletonMap(topic2, NewPartitions.increaseTo(3))).all().get()
@ -216,19 +217,15 @@ class AddPartitionsTest extends BaseRequestTest {
new MetadataRequest.Builder(Seq(topic2).asJava, false).build)
assertEquals(1, response.topicMetadata.size)
val topicMetadata = response.topicMetadata.asScala.head
validateLeaderAndReplicas(topicMetadata, 0, 1, Set(1, 2))
validateLeaderAndReplicas(topicMetadata, 1, 2, Set(0, 2))
validateLeaderAndReplicas(topicMetadata, 2, 3, Set(1, 3))
}
def validateLeaderAndReplicas(metadata: TopicMetadata, partitionId: Int, expectedLeaderId: Int,
expectedReplicas: Set[Int]): Unit = {
val partitionOpt = metadata.partitionMetadata.asScala.find(_.partition == partitionId)
assertTrue(partitionOpt.isDefined, s"Partition $partitionId should exist")
val partition = partitionOpt.get
assertEquals(Optional.of(expectedLeaderId), partition.leaderId, "Partition leader id should match")
assertEquals(expectedReplicas, partition.replicaIds.asScala.toSet, "Replica set should match")
assertEquals(3, topicMetadata.partitionMetadata.size)
for (partition <- topicMetadata.partitionMetadata.asScala) {
val replicas = partition.replicaIds.asScala.toSet
assertEquals(2, replicas.size, s"Partition ${partition.partition} should have 2 replicas")
assertTrue(replicas.subsetOf(Set(0, 1, 2, 3)), s"Replicas should only include brokers 0-3")
assertTrue(partition.leaderId.isPresent, s"Partition ${partition.partition} should have a leader")
assertTrue(replicas.contains(partition.leaderId.get), "Leader should be one of the replicas")
}
}
}