diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index e2f8aaf7435..66fca97f3d4 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -157,6 +157,7 @@
+
diff --git a/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java b/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java
new file mode 100644
index 00000000000..53ac49ca666
--- /dev/null
+++ b/core/src/test/java/kafka/admin/RackAwareAutoTopicCreationTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.api.ClusterConfigProperty;
+import org.apache.kafka.common.test.api.ClusterTest;
+import org.apache.kafka.common.test.api.ClusterTestDefaults;
+import org.apache.kafka.common.test.api.Type;
+import org.apache.kafka.server.config.ReplicationConfigs;
+import org.apache.kafka.server.config.ServerConfigs;
+import org.apache.kafka.server.config.ServerLogConfigs;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import scala.collection.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@ClusterTestDefaults(types = {Type.KRAFT},
+ brokers = 4,
+ serverProperties = {
+ @ClusterConfigProperty(key = ServerLogConfigs.NUM_PARTITIONS_CONFIG, value = "8"),
+ @ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "2"),
+ @ClusterConfigProperty(id = 0, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"),
+ @ClusterConfigProperty(id = 1, key = ServerConfigs.BROKER_RACK_CONFIG, value = "0"),
+ @ClusterConfigProperty(id = 2, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"),
+ @ClusterConfigProperty(id = 3, key = ServerConfigs.BROKER_RACK_CONFIG, value = "1"),
+ })
+public class RackAwareAutoTopicCreationTest {
+
+ private static final String TOPIC = "topic";
+
+ @ClusterTest
+ public void testAutoCreateTopic(ClusterInstance cluster) throws Exception {
+
+ try (Admin admin = cluster.admin();
+ Producer producer = cluster.producer()) {
+
+ // send a record to trigger auto create topic
+ ProducerRecord record = new ProducerRecord<>(TOPIC, null, "key".getBytes(), "value".getBytes());
+ assertEquals(0L, producer.send(record).get().offset(), "Should have offset 0");
+
+ // check broker rack content
+ Map expectedBrokerToRackMap = Map.of(
+ 0, "0",
+ 1, "0",
+ 2, "1",
+ 3, "1"
+ );
+ Map actualBrokerToRackMap = getBrokerToRackMap(cluster);
+ assertEquals(expectedBrokerToRackMap, actualBrokerToRackMap);
+
+ // get topic assignments and check it's content
+ Map> assignments = getTopicAssignment(admin);
+ for (List brokerList : assignments.values()) {
+ assertEquals(new HashSet<>(brokerList).size(), brokerList.size(),
+ "More than one replica is assigned to same broker for the same partition");
+ }
+
+ // check rack count for each partition
+ ReplicaDistributions distribution = getReplicaDistribution(assignments, expectedBrokerToRackMap);
+ Map serverProperties = cluster.config().serverProperties();
+ int numPartition = Integer.parseInt(serverProperties.get(ServerLogConfigs.NUM_PARTITIONS_CONFIG));
+ int replicationFactor = Integer.parseInt(serverProperties.get(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG));
+ List expectedRackCounts = Collections.nCopies(numPartition, replicationFactor);
+ List actualRackCounts = distribution.partitionToRackMap.values().stream()
+ .map(racks -> (int) racks.stream().distinct().count())
+ .collect(Collectors.toList());
+ assertEquals(expectedRackCounts, actualRackCounts,
+ "More than one replica of the same partition is assigned to the same rack");
+
+ // check replica count for each partition
+ int numBrokers = cluster.brokers().size();
+ int numReplicasPerBroker = numPartition * replicationFactor / numBrokers;
+ List expectedReplicasCounts = Collections.nCopies(numBrokers, numReplicasPerBroker);
+ List actualReplicasCounts = new ArrayList<>(distribution.partitionToCountMap.values());
+ assertEquals(expectedReplicasCounts, actualReplicasCounts, "Replica count is not even for broker");
+ }
+ }
+
+ private static Map> getTopicAssignment(Admin admin) throws Exception {
+ TopicDescription topicDescription = admin.describeTopics(List.of(TOPIC)).allTopicNames().get().get(TOPIC);
+ return topicDescription.partitions().stream()
+ .collect(Collectors.toMap(
+ TopicPartitionInfo::partition,
+ p -> p.replicas().stream().map(Node::id).collect(Collectors.toList())));
+ }
+
+ private static Map getBrokerToRackMap(ClusterInstance cluster) {
+ Map actualBrokerToRackMap = new HashMap<>();
+ Iterator iterator = cluster.brokers().get(0).metadataCache().getAliveBrokers().iterator();
+ while (iterator.hasNext()) {
+ BrokerMetadata metadata = iterator.next();
+ actualBrokerToRackMap.put(metadata.id, metadata.rack.get());
+ }
+ return actualBrokerToRackMap;
+ }
+
+
+ private static ReplicaDistributions getReplicaDistribution(Map> assignment,
+ Map brokerRackMapping) {
+ Map> partitionToRackMap = new HashMap<>();
+ Map partitionToCountMap = new HashMap<>();
+
+ for (Map.Entry> entry : assignment.entrySet()) {
+ int partitionId = entry.getKey();
+ List replicaList = entry.getValue();
+
+ for (int brokerId : replicaList) {
+ partitionToCountMap.put(brokerId, partitionToCountMap.getOrDefault(brokerId, 0) + 1);
+ String rack = brokerRackMapping.get(brokerId);
+ if (rack == null) {
+ throw new RuntimeException("No mapping found for " + brokerId + " in `brokerRackMapping`");
+ }
+ partitionToRackMap.computeIfAbsent(partitionId, k -> new ArrayList<>()).add(rack);
+ }
+ }
+ return new ReplicaDistributions(partitionToRackMap, partitionToCountMap);
+ }
+
+ private record ReplicaDistributions(Map> partitionToRackMap,
+ Map partitionToCountMap) {
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
deleted file mode 100644
index 03a312d5f07..00000000000
--- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.api
-
-import java.util.Properties
-import kafka.admin.RackAwareTest
-import kafka.integration.KafkaServerTestHarness
-import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
-import org.apache.kafka.clients.admin.Admin
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
-
-import scala.collection.Map
-import scala.jdk.CollectionConverters.ListHasAsScala
-
-class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest {
- val numServers = 4
- val numPartitions = 8
- val replicationFactor = 2
- val overridingProps = new Properties()
- var admin: Admin = _
- overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions.toString)
- overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, replicationFactor.toString)
-
- def generateConfigs =
- (0 until numServers) map { node =>
- TestUtils.createBrokerConfig(node, enableControlledShutdown = false, rack = Some((node / 2).toString))
- } map (KafkaConfig.fromProps(_, overridingProps))
-
- private val topic = "topic"
-
- @BeforeEach
- override def setUp(testInfo: TestInfo): Unit = {
- super.setUp(testInfo)
- admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
- }
-
- @AfterEach
- override def tearDown(): Unit = {
- if (admin != null) admin.close()
- super.tearDown()
- }
-
- @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
- def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = {
- val producer = TestUtils.createProducer(bootstrapServers())
- try {
- // Send a message to auto-create the topic
- val record = new ProducerRecord(topic, null, "key".getBytes, "value".getBytes)
- assertEquals(0L, producer.send(record).get.offset, "Should have offset 0")
-
- // double check that the topic is created with leader elected
- TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
- val assignment = getReplicaAssignment(topic)
- val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers()
- val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
- assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap)
- checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor,
- verifyLeaderDistribution = false)
- } finally producer.close()
- }
-
- private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = {
- TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition =>
- partition.partition -> partition.replicas.asScala.map(_.id).toSeq
- }.toMap
- }
-}
-
diff --git a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala b/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
deleted file mode 100644
index 62df5227374..00000000000
--- a/core/src/test/scala/unit/kafka/admin/RackAwareTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import org.apache.kafka.admin.BrokerMetadata
-
-import scala.collection.{Map, Seq, mutable}
-import org.junit.jupiter.api.Assertions._
-
-import java.util
-import java.util.Optional
-import scala.jdk.CollectionConverters._
-
-trait RackAwareTest {
-
- def checkReplicaDistribution(assignment: Map[Int, Seq[Int]],
- brokerRackMapping: Map[Int, String],
- numBrokers: Int,
- numPartitions: Int,
- replicationFactor: Int,
- verifyRackAware: Boolean = true,
- verifyLeaderDistribution: Boolean = true,
- verifyReplicasDistribution: Boolean = true): Unit = {
- // always verify that no broker will be assigned for more than one replica
- for ((_, brokerList) <- assignment) {
- assertEquals(brokerList.toSet.size, brokerList.size,
- "More than one replica is assigned to same broker for the same partition")
- }
- val distribution = getReplicaDistribution(assignment, brokerRackMapping)
-
- if (verifyRackAware) {
- val partitionRackMap = distribution.partitionRacks
- assertEquals(List.fill(numPartitions)(replicationFactor), partitionRackMap.values.toList.map(_.distinct.size),
- "More than one replica of the same partition is assigned to the same rack")
- }
-
- if (verifyLeaderDistribution) {
- val leaderCount = distribution.brokerLeaderCount
- val leaderCountPerBroker = numPartitions / numBrokers
- assertEquals(List.fill(numBrokers)(leaderCountPerBroker), leaderCount.values.toList,
- "Preferred leader count is not even for brokers")
- }
-
- if (verifyReplicasDistribution) {
- val replicasCount = distribution.brokerReplicasCount
- val numReplicasPerBroker = numPartitions * replicationFactor / numBrokers
- assertEquals(List.fill(numBrokers)(numReplicasPerBroker), replicasCount.values.toList,
- "Replica count is not even for broker")
- }
- }
-
- def getReplicaDistribution(assignment: Map[Int, Seq[Int]], brokerRackMapping: Map[Int, String]): ReplicaDistributions = {
- val leaderCount = mutable.Map[Int, Int]()
- val partitionCount = mutable.Map[Int, Int]()
- val partitionRackMap = mutable.Map[Int, List[String]]()
- assignment.foreach { case (partitionId, replicaList) =>
- val leader = replicaList.head
- leaderCount(leader) = leaderCount.getOrElse(leader, 0) + 1
- for (brokerId <- replicaList) {
- partitionCount(brokerId) = partitionCount.getOrElse(brokerId, 0) + 1
- val rack = brokerRackMapping.getOrElse(brokerId, sys.error(s"No mapping found for $brokerId in `brokerRackMapping`"))
- partitionRackMap(partitionId) = rack :: partitionRackMap.getOrElse(partitionId, List())
- }
- }
- ReplicaDistributions(partitionRackMap, leaderCount, partitionCount)
- }
-
- def toBrokerMetadata(rackMap: Map[Int, String], brokersWithoutRack: Seq[Int] = Seq.empty): util.Collection[BrokerMetadata] = {
- val res = rackMap.toSeq.map { case (brokerId, rack) =>
- new BrokerMetadata(brokerId, Optional.ofNullable(rack))
- } ++ brokersWithoutRack.map { brokerId =>
- new BrokerMetadata(brokerId, Optional.empty())
- }.sortBy(_.id)
-
- res.asJavaCollection
- }
-
-}
-
-case class ReplicaDistributions(partitionRacks: Map[Int, Seq[String]], brokerLeaderCount: Map[Int, Int], brokerReplicasCount: Map[Int, Int])