From d3559f628b2ccb23a9faf531796675376ac06abb Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 12 Sep 2019 05:24:27 +0800 Subject: [PATCH] KAFKA-8875; CreateTopic API should check topic existence before replication factor (#7298) If the topic already exists, `handleCreateTopicsRequest` should return TopicExistsException even given an invalid config (replication factor for instance). Reviewers: Rajini Sivaram , Jason Gustafson --- .../main/scala/kafka/server/AdminManager.scala | 5 ++++- .../kafka/api/AdminClientIntegrationTest.scala | 17 +++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 7b71e35a88e..1c4115e2006 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp import org.apache.kafka.clients.admin.AlterConfigOp.OpType import org.apache.kafka.common.config.ConfigDef.ConfigKey import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig} -import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException} import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.metrics.Metrics @@ -88,6 +88,9 @@ class AdminManager(val config: KafkaConfig, val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) } val metadata = toCreate.values.map(topic => try { + if (metadataCache.contains(topic.name)) + throw new TopicExistsException(s"Topic '${topic.name}' already exists.") + val configs = new Properties() topic.configs.asScala.foreach { entry => configs.setProperty(entry.name, entry.value) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 02b8f1afc70..27c435be3be 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -229,6 +229,23 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { waitForTopics(client, List(), topics) } + @Test + def testCreateExistingTopicsThrowTopicExistsException(): Unit = { + client = AdminClient.create(createConfig()) + val topic = "mytopic" + val topics = Seq(topic) + val newTopics = Seq(new NewTopic(topic, 1, 1.toShort)) + + client.createTopics(newTopics.asJava).all.get() + waitForTopics(client, topics, List()) + + val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort)) + val e = intercept[ExecutionException] { + client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get() + } + assertTrue(e.getCause.isInstanceOf[TopicExistsException]) + } + @Test def testMetadataRefresh(): Unit = { client = AdminClient.create(createConfig())