KAFKA-8875; CreateTopic API should check topic existence before replication factor (#7298)

If the topic already exists, `handleCreateTopicsRequest` should return TopicExistsException even given an invalid config (replication factor for instance).

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>, Jason Gustafson <jason@confluent.io>
This commit is contained in:
huxi 2019-09-12 05:24:27 +08:00 committed by Jason Gustafson
parent 6a3a580399
commit d3559f628b
2 changed files with 21 additions and 1 deletions

View File

@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin.AlterConfigOp
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.errors.{ApiException, InvalidConfigurationException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
import org.apache.kafka.common.metrics.Metrics
@ -88,6 +88,9 @@ class AdminManager(val config: KafkaConfig,
val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) }
val metadata = toCreate.values.map(topic =>
try {
if (metadataCache.contains(topic.name))
throw new TopicExistsException(s"Topic '${topic.name}' already exists.")
val configs = new Properties()
topic.configs.asScala.foreach { entry =>
configs.setProperty(entry.name, entry.value)

View File

@ -229,6 +229,23 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
waitForTopics(client, List(), topics)
}
@Test
def testCreateExistingTopicsThrowTopicExistsException(): Unit = {
client = AdminClient.create(createConfig())
val topic = "mytopic"
val topics = Seq(topic)
val newTopics = Seq(new NewTopic(topic, 1, 1.toShort))
client.createTopics(newTopics.asJava).all.get()
waitForTopics(client, topics, List())
val newTopicsWithInvalidRF = Seq(new NewTopic(topic, 1, (servers.size + 1).toShort))
val e = intercept[ExecutionException] {
client.createTopics(newTopicsWithInvalidRF.asJava, new CreateTopicsOptions().validateOnly(true)).all.get()
}
assertTrue(e.getCause.isInstanceOf[TopicExistsException])
}
@Test
def testMetadataRefresh(): Unit = {
client = AdminClient.create(createConfig())