From 1edc30bf30937a36310eb5870de2c8fe09e622cc Mon Sep 17 00:00:00 2001 From: xijiu <422766572@qq.com> Date: Tue, 25 Feb 2025 18:15:34 +0800 Subject: [PATCH] KAFKA-17836 Move RackAwareTest to server module (#19021) Reviewers: Chia-Ping Tsai --- checkstyle/import-control-core.xml | 1 + .../admin/RackAwareAutoTopicCreationTest.java | 152 ++++++++++++++++++ .../api/RackAwareAutoTopicCreationTest.scala | 91 ----------- .../unit/kafka/admin/RackAwareTest.scala | 94 ----------- 4 files changed, 153 insertions(+), 185 deletions(-) create mode 100644 core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java delete mode 100644 core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala delete mode 100644 core/src/test/scala/unit/kafka/admin/RackAwareTest.scala diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml index e2f8aaf7435..66fca97f3d4 100644 --- a/checkstyle/import-control-core.xml +++ b/checkstyle/import-control-core.xml @@ -157,6 +157,7 @@ + diff --git a/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java b/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java new file mode 100644 index 00000000000..53ac49ca666 --- /dev/null +++ b/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin; + +import org.apache.kafka.admin.BrokerMetadata; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfigProperty; +import org.apache.kafka.common.test.api.ClusterTest; +import org.apache.kafka.common.test.api.ClusterTestDefaults; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import scala.collection.Iterator; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@ClusterTestDefaults(types = {Type.KRAFT}, + brokers = 4, + serverProperties = { + @ClusterConfigProperty(key = ServerLogConfigs.NUM_PARTITIONS_CONFIG, value = "8"), + @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "2"), + @ClusterConfigProperty(id = 0, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"), + @ClusterConfigProperty(id = 1, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"), + @ClusterConfigProperty(id = 2, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"), + @ClusterConfigProperty(id = 3, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"), + }) +public class RackAwareAutoTopicCreationTest { + + private static final String TOPIC = "topic"; + + @ClusterTest + public void testAutoCreateTopic(ClusterInstance cluster) throws Exception { + + try (Admin admin = cluster.admin(); + Producer producer = cluster.producer()) { + + // send a record to trigger auto create topic + ProducerRecord record = new ProducerRecord<>(TOPIC, null, "key".getBytes(), "value".getBytes()); + assertEquals(0L, producer.send(record).get().offset(), "Should have offset 0"); + + // check broker rack content + Map expectedBrokerToRackMap = Map.of( + 0, "0", + 1, "0", + 2, "1", + 3, "1" + ); + Map actualBrokerToRackMap = getBrokerToRackMap(cluster); + assertEquals(expectedBrokerToRackMap, actualBrokerToRackMap); + + // get topic assignments and check it's content + Map> assignments = getTopicAssignment(admin); + for (List brokerList : assignments.values()) { + assertEquals(new HashSet<>(brokerList).size(), brokerList.size(), + "More than one replica is assigned to same broker for the same partition"); + } + + // check rack count for each partition + ReplicaDistributions distribution = getReplicaDistribution(assignments, expectedBrokerToRackMap); + Map serverProperties = cluster.config().serverProperties(); + int numPartition = Integer.parseInt(serverProperties.get(ServerLogConfigs.NUM_PARTITIONS_CONFIG)); + int replicationFactor = Integer.parseInt(serverProperties.get(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG)); + List expectedRackCounts = Collections.nCopies(numPartition, replicationFactor); + List actualRackCounts = distribution.partitionToRackMap.values().stream() + .map(racks -> (int) racks.stream().distinct().count()) + .collect(Collectors.toList()); + assertEquals(expectedRackCounts, actualRackCounts, + "More than one replica of the same partition is assigned to the same rack"); + + // check replica count for each partition + int numBrokers = cluster.brokers().size(); + int numReplicasPerBroker = numPartition * replicationFactor / numBrokers; + List expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker); + List actualReplicasCounts = new ArrayList<>(distribution.partitionToCountMap.values()); + assertEquals(expectedReplicasCounts, actualReplicasCounts, "Replica count is not even for broker"); + } + } + + private static Map> getTopicAssignment(Admin admin) throws Exception { + TopicDescription topicDescription = admin.describeTopics(List.of(TOPIC)).allTopicNames().get().get(TOPIC); + return topicDescription.partitions().stream() + .collect(Collectors.toMap( + TopicPartitionInfo::partition, + p -> p.replicas().stream().map(Node::id).collect(Collectors.toList()))); + } + + private static Map getBrokerToRackMap(ClusterInstance cluster) { + Map actualBrokerToRackMap = new HashMap<>(); + Iterator iterator = cluster.brokers().get(0).metadataCache().getAliveBrokers().iterator(); + while (iterator.hasNext()) { + BrokerMetadata metadata = iterator.next(); + actualBrokerToRackMap.put(metadata.id, metadata.rack.get()); + } + return actualBrokerToRackMap; + } + + + private static ReplicaDistributions getReplicaDistribution(Map> assignment, + Map brokerRackMapping) { + Map> partitionToRackMap = new HashMap<>(); + Map partitionToCountMap = new HashMap<>(); + + for (Map.Entry> entry : assignment.entrySet()) { + int partitionId = entry.getKey(); + List replicaList = entry.getValue(); + + for (int brokerId : replicaList) { + partitionToCountMap.put(brokerId, partitionToCountMap.getOrDefault(brokerId, 0) + 1); + String rack = brokerRackMapping.get(brokerId); + if (rack == null) { + throw new RuntimeException("No mapping found for " + brokerId + " in `brokerRackMapping`"); + } + partitionToRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack); + } + } + return new ReplicaDistributions(partitionToRackMap, partitionToCountMap); + } + + private record ReplicaDistributions(Map> partitionToRackMap, + Map partitionToCountMap) { + } +} diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala deleted file mode 100644 index 03a312d5f07..00000000000 --- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.api - -import java.util.Properties -import kafka.admin.RackAwareTest -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig -import kafka.utils.{TestInfoUtils, TestUtils} -import org.apache.kafka.clients.admin.Admin -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.MethodSource - -import scala.collection.Map -import scala.jdk.CollectionConverters.ListHasAsScala - -class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest { - val numServers = 4 - val numPartitions = 8 - val replicationFactor = 2 - val overridingProps = new Properties() - var admin: Admin = _ - overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions.toString) - overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, replicationFactor.toString) - - def generateConfigs = - (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, enableControlledShutdown = false, rack = Some((node / 2).toString)) - } map (KafkaConfig.fromProps(_, overridingProps)) - - private val topic = "topic" - - @BeforeEach - override def setUp(testInfo: TestInfo): Unit = { - super.setUp(testInfo) - admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) - } - - @AfterEach - override def tearDown(): Unit = { - if (admin != null) admin.close() - super.tearDown() - } - - @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) - @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) - def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = { - val producer = TestUtils.createProducer(bootstrapServers()) - try { - // Send a message to auto-create the topic - val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes) - assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") - - // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0) - val assignment = getReplicaAssignment(topic) - val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers() - val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1") - assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap) - checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor, - verifyLeaderDistribution = false) - } finally producer.close() - } - - private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = { - TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition => - partition.partition -> partition.replicas.asScala.map(_.id).toSeq - }.toMap - } -} - diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala deleted file mode 100644 index 62df5227374..00000000000 --- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package kafka.admin - -import org.apache.kafka.admin.BrokerMetadata - -import scala.collection.{Map, Seq, mutable} -import org.junit.jupiter.api.Assertions._ - -import java.util -import java.util.Optional -import scala.jdk.CollectionConverters._ - -trait RackAwareTest { - - def checkReplicaDistribution(assignment: Map[Int, Seq[Int]], - brokerRackMapping: Map[Int, String], - numBrokers: Int, - numPartitions: Int, - replicationFactor: Int, - verifyRackAware: Boolean = true, - verifyLeaderDistribution: Boolean = true, - verifyReplicasDistribution: Boolean = true): Unit = { - // always verify that no broker will be assigned for more than one replica - for ((_, brokerList) <- assignment) { - assertEquals(brokerList.toSet.size, brokerList.size, - "More than one replica is assigned to same broker for the same partition") - } - val distribution = getReplicaDistribution(assignment, brokerRackMapping) - - if (verifyRackAware) { - val partitionRackMap = distribution.partitionRacks - assertEquals(List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size), - "More than one replica of the same partition is assigned to the same rack") - } - - if (verifyLeaderDistribution) { - val leaderCount = distribution.brokerLeaderCount - val leaderCountPerBroker = numPartitions / numBrokers - assertEquals(List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList, - "Preferred leader count is not even for brokers") - } - - if (verifyReplicasDistribution) { - val replicasCount = distribution.brokerReplicasCount - val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers - assertEquals(List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList, - "Replica count is not even for broker") - } - } - - def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = { - val leaderCount = mutable.Map[Int, Int]() - val partitionCount = mutable.Map[Int, Int]() - val partitionRackMap = mutable.Map[Int, List[String]]() - assignment.foreach { case (partitionId, replicaList) => - val leader = replicaList.head - leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1 - for (brokerId <- replicaList) { - partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1 - val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`")) - partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List()) - } - } - ReplicaDistributions(partitionRackMap, leaderCount, partitionCount) - } - - def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): util.Collection[BrokerMetadata] = { - val res = rackMap.toSeq.map { case (brokerId, rack) => - new BrokerMetadata(brokerId, Optional.ofNullable(rack)) - } ++ brokersWithoutRack.map { brokerId => - new BrokerMetadata(brokerId, Optional.empty()) - }.sortBy(_.id) - - res.asJavaCollection - } - -} - -case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])