From 5efaae65c6b33af034a368d71ebf68e10e4286ba Mon Sep 17 00:00:00 2001 From: Logan Zhu Date: Thu, 9 Jan 2025 02:52:28 +0800 Subject: [PATCH] KAFKA-18432 Remove unused code from AutoTopicCreationManager (#18438) Reviewers: Chia-Ping Tsai --- .../server/AutoTopicCreationManager.scala | 85 +------------------ .../scala/kafka/server/BrokerServer.scala | 4 +- 2 files changed, 5 insertions(+), 84 deletions(-) diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala index 58b3035935c..e3abde0bda4 100644 --- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala +++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala @@ -18,9 +18,7 @@ package kafka.server import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.AtomicReference import java.util.{Collections, Properties} -import kafka.controller.KafkaController import kafka.coordinator.transaction.TransactionCoordinator import kafka.utils.Logging import org.apache.kafka.clients.ClientResponse @@ -31,7 +29,7 @@ import org.apache.kafka.common.message.CreateTopicsRequestData import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicConfig, CreatableTopicConfigCollection} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{ApiError, CreateTopicsRequest, RequestContext, RequestHeader} +import org.apache.kafka.common.requests.{CreateTopicsRequest, RequestContext, RequestHeader} import org.apache.kafka.coordinator.group.GroupCoordinator import org.apache.kafka.coordinator.share.ShareCoordinator import org.apache.kafka.server.common.{ControllerRequestCompletionHandler, NodeToControllerChannelManager} @@ -49,34 +47,13 @@ trait AutoTopicCreationManager { ): Seq[MetadataResponseTopic] } -object AutoTopicCreationManager { - - def apply( - config: KafkaConfig, - channelManager: Option[NodeToControllerChannelManager], - adminManager: Option[ZkAdminManager], - controller: Option[KafkaController], - groupCoordinator: GroupCoordinator, - txnCoordinator: TransactionCoordinator, - shareCoordinator: Option[ShareCoordinator], - ): AutoTopicCreationManager = { - new DefaultAutoTopicCreationManager(config, channelManager, adminManager, - controller, groupCoordinator, txnCoordinator, shareCoordinator) - } -} - class DefaultAutoTopicCreationManager( config: KafkaConfig, - channelManager: Option[NodeToControllerChannelManager], - adminManager: Option[ZkAdminManager], - controller: Option[KafkaController], + channelManager: NodeToControllerChannelManager, groupCoordinator: GroupCoordinator, txnCoordinator: TransactionCoordinator, shareCoordinator: Option[ShareCoordinator] ) extends AutoTopicCreationManager with Logging { - if (controller.isEmpty && channelManager.isEmpty) { - throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller") - } private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]()) @@ -99,65 +76,13 @@ class DefaultAutoTopicCreationManager( val creatableTopicResponses = if (creatableTopics.isEmpty) { Seq.empty - } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) { - sendCreateTopicRequest(creatableTopics, metadataRequestContext) } else { - createTopicsInZk(creatableTopics, controllerMutationQuota) + sendCreateTopicRequest(creatableTopics, metadataRequestContext) } uncreatableTopicResponses ++ creatableTopicResponses } - private def createTopicsInZk( - creatableTopics: Map[String, CreatableTopic], - controllerMutationQuota: ControllerMutationQuota - ): Seq[MetadataResponseTopic] = { - val topicErrors = new AtomicReference[Map[String, ApiError]]() - try { - // Note that we use timeout = 0 since we do not need to wait for metadata propagation - // and we want to get the response error immediately. - adminManager.get.createTopics( - timeout = 0, - validateOnly = false, - creatableTopics, - Map.empty, - controllerMutationQuota, - topicErrors.set - ) - - val creatableTopicResponses = Option(topicErrors.get) match { - case Some(errors) => - errors.toSeq.map { case (topic, apiError) => - val error = apiError.error match { - case Errors.TOPIC_ALREADY_EXISTS | Errors.REQUEST_TIMED_OUT => - // The timeout error is expected because we set timeout=0. This - // nevertheless indicates that the topic metadata was created - // successfully, so we return LEADER_NOT_AVAILABLE. - Errors.LEADER_NOT_AVAILABLE - case error => error - } - - new MetadataResponseTopic() - .setErrorCode(error.code) - .setName(topic) - .setIsInternal(Topic.isInternal(topic)) - } - - case None => - creatableTopics.keySet.toSeq.map { topic => - new MetadataResponseTopic() - .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) - .setName(topic) - .setIsInternal(Topic.isInternal(topic)) - } - } - - creatableTopicResponses - } finally { - clearInflightRequests(creatableTopics) - } - } - private def sendCreateTopicRequest( creatableTopics: Map[String, CreatableTopic], metadataRequestContext: Option[RequestContext] @@ -189,10 +114,6 @@ class DefaultAutoTopicCreationManager( } } - val channelManager = this.channelManager.getOrElse { - throw new IllegalStateException("Channel manager must be defined in order to send CreateTopic requests.") - } - val request = metadataRequestContext.map { context => val requestVersion = channelManager.controllerApiVersions.toScala match { diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 7ab6219f159..3fd2f7789c9 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -385,8 +385,8 @@ class BrokerServer( producerIdManagerSupplier, metrics, metadataCache, Time.SYSTEM) autoTopicCreationManager = new DefaultAutoTopicCreationManager( - config, Some(clientToControllerChannelManager), None, None, - groupCoordinator, transactionCoordinator, shareCoordinator) + config, clientToControllerChannelManager, groupCoordinator, + transactionCoordinator, shareCoordinator) dynamicConfigHandlers = Map[String, ConfigHandler]( ConfigType.TOPIC -> new TopicConfigHandler(replicaManager, config, quotaManagers, None),