KAFKA-17836 Move RackAwareTest to server module (#19021)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
This commit is contained in:
xijiu 2025-02-25 18:15:34 +08:00 committed by GitHub
parent 1c82b89b4c
commit 1edc30bf30
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 153 additions and 185 deletions

View File

@ -157,6 +157,7 @@
<allow pkg="org.apache.kafka.common.test"/> <allow pkg="org.apache.kafka.common.test"/>
<allow pkg="org.apache.kafka.common.test.api"/> <allow pkg="org.apache.kafka.common.test.api"/>
<allow pkg="org.apache.kafka.common.test.api"/> <allow pkg="org.apache.kafka.common.test.api"/>
<allow pkg="org.apache.kafka.admin"/>
</subpackage> </subpackage>
<subpackage name="security"> <subpackage name="security">

View File

@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin;
import org.apache.kafka.admin.BrokerMetadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import scala.collection.Iterator;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ClusterTestDefaults(types = {Type.KRAFT},
brokers = 4,
serverProperties = {
@ClusterConfigProperty(key = ServerLogConfigs.NUM_PARTITIONS_CONFIG, value = "8"),
@ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "2"),
@ClusterConfigProperty(id = 0, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"),
@ClusterConfigProperty(id = 1, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"),
@ClusterConfigProperty(id = 2, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"),
@ClusterConfigProperty(id = 3, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"),
})
public class RackAwareAutoTopicCreationTest {
private static final String TOPIC = "topic";
@ClusterTest
public void testAutoCreateTopic(ClusterInstance cluster) throws Exception {
try (Admin admin = cluster.admin();
Producer<byte[], byte[]> producer = cluster.producer()) {
// send a record to trigger auto create topic
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(TOPIC, null, "key".getBytes(), "value".getBytes());
assertEquals(0L, producer.send(record).get().offset(), "Should have offset 0");
// check broker rack content
Map<Integer, String> expectedBrokerToRackMap = Map.of(
0, "0",
1, "0",
2, "1",
3, "1"
);
Map<Integer, String> actualBrokerToRackMap = getBrokerToRackMap(cluster);
assertEquals(expectedBrokerToRackMap, actualBrokerToRackMap);
// get topic assignments and check it's content
Map<Integer, List<Integer>> assignments = getTopicAssignment(admin);
for (List<Integer> brokerList : assignments.values()) {
assertEquals(new HashSet<>(brokerList).size(), brokerList.size(),
"More than one replica is assigned to same broker for the same partition");
}
// check rack count for each partition
ReplicaDistributions distribution = getReplicaDistribution(assignments, expectedBrokerToRackMap);
Map<String, String> serverProperties = cluster.config().serverProperties();
int numPartition = Integer.parseInt(serverProperties.get(ServerLogConfigs.NUM_PARTITIONS_CONFIG));
int replicationFactor = Integer.parseInt(serverProperties.get(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG));
List<Integer> expectedRackCounts = Collections.nCopies(numPartition, replicationFactor);
List<Integer> actualRackCounts = distribution.partitionToRackMap.values().stream()
.map(racks -> (int) racks.stream().distinct().count())
.collect(Collectors.toList());
assertEquals(expectedRackCounts, actualRackCounts,
"More than one replica of the same partition is assigned to the same rack");
// check replica count for each partition
int numBrokers = cluster.brokers().size();
int numReplicasPerBroker = numPartition * replicationFactor / numBrokers;
List<Integer> expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker);
List<Integer> actualReplicasCounts = new ArrayList<>(distribution.partitionToCountMap.values());
assertEquals(expectedReplicasCounts, actualReplicasCounts, "Replica count is not even for broker");
}
}
private static Map<Integer, List<Integer>> getTopicAssignment(Admin admin) throws Exception {
TopicDescription topicDescription = admin.describeTopics(List.of(TOPIC)).allTopicNames().get().get(TOPIC);
return topicDescription.partitions().stream()
.collect(Collectors.toMap(
TopicPartitionInfo::partition,
p -> p.replicas().stream().map(Node::id).collect(Collectors.toList())));
}
private static Map<Integer, String> getBrokerToRackMap(ClusterInstance cluster) {
Map<Integer, String> actualBrokerToRackMap = new HashMap<>();
Iterator<BrokerMetadata> iterator = cluster.brokers().get(0).metadataCache().getAliveBrokers().iterator();
while (iterator.hasNext()) {
BrokerMetadata metadata = iterator.next();
actualBrokerToRackMap.put(metadata.id, metadata.rack.get());
}
return actualBrokerToRackMap;
}
private static ReplicaDistributions getReplicaDistribution(Map<Integer, List<Integer>> assignment,
Map<Integer, String> brokerRackMapping) {
Map<Integer, List<String>> partitionToRackMap = new HashMap<>();
Map<Integer, Integer> partitionToCountMap = new HashMap<>();
for (Map.Entry<Integer, List<Integer>> entry : assignment.entrySet()) {
int partitionId = entry.getKey();
List<Integer> replicaList = entry.getValue();
for (int brokerId : replicaList) {
partitionToCountMap.put(brokerId, partitionToCountMap.getOrDefault(brokerId, 0) + 1);
String rack = brokerRackMapping.get(brokerId);
if (rack == null) {
throw new RuntimeException("No mapping found for " + brokerId + " in `brokerRackMapping`");
}
partitionToRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack);
}
}
return new ReplicaDistributions(partitionToRackMap, partitionToCountMap);
}
private record ReplicaDistributions(Map<Integer, List<String>> partitionToRackMap,
Map<Integer, Integer> partitionToCountMap) {
}
}

View File

@ -1,91 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.api
import java.util.Properties
import kafka.admin.RackAwareTest
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin.Admin
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
import scala.collection.Map
import scala.jdk.CollectionConverters.ListHasAsScala
class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {
val numServers = 4
val numPartitions = 8
val replicationFactor = 2
val overridingProps = new Properties()
var admin: Admin = _
overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions.toString)
overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, replicationFactor.toString)
def generateConfigs =
(0 until numServers) map { node =>
TestUtils.createBrokerConfig(node, enableControlledShutdown = false, rack = Some((node / 2).toString))
} map (KafkaConfig.fromProps(_, overridingProps))
private val topic = "topic"
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
}
@AfterEach
override def tearDown(): Unit = {
if (admin != null) admin.close()
super.tearDown()
}
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = {
val producer = TestUtils.createProducer(bootstrapServers())
try {
// Send a message to auto-create the topic
val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
// double check that the topic is created with leader elected
TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
val assignment = getReplicaAssignment(topic)
val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers()
val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor,
verifyLeaderDistribution = false)
} finally producer.close()
}
private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = {
TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition =>
partition.partition -> partition.replicas.asScala.map(_.id).toSeq
}.toMap
}
}

View File

@ -1,94 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import org.apache.kafka.admin.BrokerMetadata
import scala.collection.{Map, Seq, mutable}
import org.junit.jupiter.api.Assertions._
import java.util
import java.util.Optional
import scala.jdk.CollectionConverters._
trait RackAwareTest {
def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
brokerRackMapping: Map[Int, String],
numBrokers: Int,
numPartitions: Int,
replicationFactor: Int,
verifyRackAware: Boolean = true,
verifyLeaderDistribution: Boolean = true,
verifyReplicasDistribution: Boolean = true): Unit = {
// always verify that no broker will be assigned for more than one replica
for ((_, brokerList) <- assignment) {
assertEquals(brokerList.toSet.size, brokerList.size,
"More than one replica is assigned to same broker for the same partition")
}
val distribution = getReplicaDistribution(assignment, brokerRackMapping)
if (verifyRackAware) {
val partitionRackMap = distribution.partitionRacks
assertEquals(List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size),
"More than one replica of the same partition is assigned to the same rack")
}
if (verifyLeaderDistribution) {
val leaderCount = distribution.brokerLeaderCount
val leaderCountPerBroker = numPartitions / numBrokers
assertEquals(List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList,
"Preferred leader count is not even for brokers")
}
if (verifyReplicasDistribution) {
val replicasCount = distribution.brokerReplicasCount
val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
assertEquals(List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList,
"Replica count is not even for broker")
}
}
def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
val leaderCount = mutable.Map[Int, Int]()
val partitionCount = mutable.Map[Int, Int]()
val partitionRackMap = mutable.Map[Int, List[String]]()
assignment.foreach { case (partitionId, replicaList) =>
val leader = replicaList.head
leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
for (brokerId <- replicaList) {
partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
}
}
ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
}
def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): util.Collection[BrokerMetadata] = {
val res = rackMap.toSeq.map { case (brokerId, rack) =>
new BrokerMetadata(brokerId, Optional.ofNullable(rack))
} ++ brokersWithoutRack.map { brokerId =>
new BrokerMetadata(brokerId, Optional.empty())
}.sortBy(_.id)
res.asJavaCollection
}
}
case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])